This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 00d90033d9 NIFI-14616 Added logging when Components are not being
restarted when updating Flow (#9977)
00d90033d9 is described below
commit 00d90033d9f6aae969691ab56c92869094c0233c
Author: Mark Payne <[email protected]>
AuthorDate: Thu May 29 09:03:57 2025 -0400
NIFI-14616 Added logging when Components are not being restarted when
updating Flow (#9977)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/controller/StandardProcessorNode.java | 14 +-
.../registry/flow/mapping/FlowMappingOptions.java | 5 +-
.../serialization/AffectedComponentSet.java | 141 +++++++++++++--------
.../serialization/ComponentSetFilter.java | 2 +-
.../serialization/VersionedFlowSynchronizer.java | 34 +++--
.../serialization/TestAffectedComponentSet.java | 86 +++++++++++++
6 files changed, 212 insertions(+), 70 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 6afb9492b3..1eb7a05dee 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1380,7 +1380,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
@Override
public void enable() {
- desiredState = ScheduledState.STOPPED;
+ setDesiredState(ScheduledState.STOPPED);
final boolean updated =
scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED);
if (updated) {
@@ -1392,7 +1392,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
@Override
public void disable() {
- desiredState = ScheduledState.DISABLED;
+ setDesiredState(ScheduledState.DISABLED);
final boolean updated =
scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
if (updated) {
@@ -1402,6 +1402,10 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
}
}
+ private void setDesiredState(final ScheduledState desiredState) {
+ this.desiredState = desiredState;
+ LOG.info("Desired State for {} now set to {}", this, desiredState);
+ }
/**
* Will idempotently start the processor using the following sequence: <i>
@@ -1480,10 +1484,10 @@ public class StandardProcessorNode extends
ProcessorNode implements Connectable
if (currentState == ScheduledState.STOPPED) {
starting =
this.scheduledState.compareAndSet(ScheduledState.STOPPED, scheduledState);
if (starting) {
- this.desiredState = desiredState;
+ setDesiredState(desiredState);
}
} else if (currentState == ScheduledState.STOPPING &&
!failIfStopping) {
- this.desiredState = desiredState;
+ setDesiredState(desiredState);
return;
} else {
starting = false;
@@ -1800,7 +1804,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
final Processor processor = processorRef.get().getProcessor();
LOG.debug("Stopping processor: {}", this);
- desiredState = ScheduledState.STOPPED;
+ setDesiredState(ScheduledState.STOPPED);
final CompletableFuture<Void> future = new CompletableFuture<>();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java
index 8a5b24deb0..b2bab4f777 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java
@@ -177,6 +177,7 @@ public class FlowMappingOptions {
/**
* Specifies whether or not the identifier of a Flow Registry Client
should be included in the VersionedFlowCoordinates of a Versioned Process Group
+ *
* @param mapFlowRegistryClientId <code>true</code> if the Registry ID
of the Flow Registry Client should be mapped, <code>false</code> otherwise
* @return the builder
*/
@@ -195,8 +196,8 @@ public class FlowMappingOptions {
*
* @return the FlowMappingOptions
* @throws NullPointerException if the {@link
#stateLookup(VersionedComponentStateLookup) StateLookup} is not set, the
- * {@link #componentIdLookup(ComponentIdLookup) ComponentIdLookup} is
not set, or if {@link #mapSensitiveConfiguration(boolean)
mapSensitiveConfiguration}
- * is set to true but the {@link
#sensitiveValueEncryptor(SensitiveValueEncryptor) SensitiveValueEncryptor} has
not been set
+ * {@link
#componentIdLookup(ComponentIdLookup) ComponentIdLookup} is not set, or if
{@link #mapSensitiveConfiguration(boolean) mapSensitiveConfiguration}
+ * is set to true but the {@link
#sensitiveValueEncryptor(SensitiveValueEncryptor) SensitiveValueEncryptor} has
not been set
*/
public FlowMappingOptions build() {
requireNonNull(stateLookup, "State Lookup must be set");
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
index d3b6e0b6f9..1122c2b32c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
@@ -59,6 +59,7 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@@ -455,22 +456,18 @@ public class AffectedComponentSet {
}
final ExecutionEngine executionEngine = start.getExecutionEngine();
- switch (executionEngine) {
- case STATELESS:
- return start;
- case INHERITED:
- return getStatelessGroup(start.getParent());
- case STANDARD:
- default:
- return null;
- }
+ return switch (executionEngine) {
+ case STATELESS -> start;
+ case INHERITED -> getStatelessGroup(start.getParent());
+ default -> null;
+ };
}
private void addComponentsForParameterUpdate(final FlowDifference
difference) {
final DifferenceType differenceType = difference.getDifferenceType();
final Optional<String> optionalParameterName =
difference.getFieldName();
- if (!optionalParameterName.isPresent()) {
+ if (optionalParameterName.isEmpty()) {
logger.warn("Encountered a Flow Difference {} with Difference Type
of {} but no indication as to which parameter was updated.", difference,
differenceType);
return;
}
@@ -505,21 +502,13 @@ public class AffectedComponentSet {
}
private Connectable getConnectable(final ConnectableComponentType type,
final String identifier) {
- switch (type) {
- case FUNNEL:
- return flowManager.getFunnel(identifier);
- case INPUT_PORT:
- return flowManager.getInputPort(identifier);
- case OUTPUT_PORT:
- return flowManager.getOutputPort(identifier);
- case PROCESSOR:
- return flowManager.getProcessorNode(identifier);
- case REMOTE_INPUT_PORT:
- case REMOTE_OUTPUT_PORT:
- return
flowManager.getRootGroup().findRemoteGroupPort(identifier);
- default:
- return null;
- }
+ return switch (type) {
+ case FUNNEL -> flowManager.getFunnel(identifier);
+ case INPUT_PORT -> flowManager.getInputPort(identifier);
+ case OUTPUT_PORT -> flowManager.getOutputPort(identifier);
+ case PROCESSOR -> flowManager.getProcessorNode(identifier);
+ case REMOTE_INPUT_PORT, REMOTE_OUTPUT_PORT ->
flowManager.getRootGroup().findRemoteGroupPort(identifier);
+ };
}
private void addAffectedComponents(final VersionedComponent
versionedComponent) {
@@ -623,20 +612,71 @@ public class AffectedComponentSet {
processors.forEach(processor ->
processor.getProcessGroup().startProcessor(processor, false));
reportingTasks.forEach(flowController::startReportingTask);
flowAnalysisRules.forEach(flowController::enableFlowAnalysisRule);
- statelessProcessGroups.forEach(group -> group.startProcessing());
+ statelessProcessGroups.forEach(ProcessGroup::startProcessing);
+ }
+
+ public int getComponentCount() {
+ return inputPorts.size() + outputPorts.size() +
remoteInputPorts.size() + remoteOutputPorts.size() +
+ processors.size() + reportingTasks.size() +
flowAnalysisRules.size() + controllerServices.size() +
+ flowRegistryClients.size() + statelessProcessGroups.size();
+ }
+
+ public AffectedComponentSet removeComponents(final ComponentSetFilter
filter) {
+ final AffectedComponentSet removed = new
AffectedComponentSet(flowController);
+ removeMatching(inputPorts,
filter::testInputPort).forEach(removed::addInputPort);
+ removeMatching(outputPorts,
filter::testOutputPort).forEach(removed::addOutputPort);
+ removeMatching(remoteInputPorts,
filter::testRemoteInputPort).forEach(removed::addRemoteInputPort);
+ removeMatching(remoteOutputPorts,
filter::testRemoteOutputPort).forEach(removed::addRemoteOutputPort);
+ removeMatching(processors,
filter::testProcessor).forEach(removed::addProcessor);
+ removeMatching(controllerServices,
filter::testControllerService).forEach(removed::addControllerServiceWithoutReferences);
+ removeMatching(reportingTasks,
filter::testReportingTask).forEach(removed::addReportingTask);
+ removeMatching(flowAnalysisRules,
filter::testFlowAnalysisRule).forEach(removed::addFlowAnalysisRule);
+ removeMatching(flowRegistryClients,
filter::testFlowRegistryClient).forEach(removed::addFlowRegistryClient);
+ removeMatching(statelessProcessGroups,
filter::testStatelessGroup).forEach(removed::addStatelessGroup);
+ return removed;
+ }
+
+ private <T> Set<T> removeMatching(final Set<T> set, final Predicate<T>
test) {
+ final Set<T> toRemove = new HashSet<>();
+ for (final T element : set) {
+ if (test.test(element)) {
+ toRemove.add(element);
+ }
+ }
+
+ set.removeAll(toRemove);
+ return toRemove;
}
- public void removeComponents(final ComponentSetFilter filter) {
- inputPorts.removeIf(filter::testInputPort);
- outputPorts.removeIf(filter::testOutputPort);
- remoteInputPorts.removeIf(filter::testRemoteInputPort);
- remoteOutputPorts.removeIf(filter::testRemoteOutputPort);
- processors.removeIf(filter::testProcessor);
- controllerServices.removeIf(filter::testControllerService);
- reportingTasks.removeIf(filter::testReportingTask);
- flowAnalysisRules.removeIf(filter::testFlowAnalysisRule);
- flowRegistryClients.removeIf(filter::testFlowRegistryClient);
- statelessProcessGroups.removeIf(filter::testStatelessGroup);
+ /**
+ * Returns an AffectedComponentSet that contains all components that are
in this but not in the given AffectedComponentSet
+ * @param other the AffectedComponentSet to subtract from this
+ * @return the AffectedComponentSet representing the difference
+ */
+ public AffectedComponentSet minus(final AffectedComponentSet other) {
+ final AffectedComponentSet result = new
AffectedComponentSet(flowController);
+ inputPorts.stream().filter(port ->
!other.inputPorts.contains(port)).forEach(result::addInputPort);
+ outputPorts.stream().filter(port ->
!other.outputPorts.contains(port)).forEach(result::addOutputPort);
+ remoteInputPorts.stream().filter(port ->
!other.remoteInputPorts.contains(port)).forEach(result::addRemoteInputPort);
+ remoteOutputPorts.stream().filter(port ->
!other.remoteOutputPorts.contains(port)).forEach(result::addRemoteOutputPort);
+ processors.stream().filter(processor ->
!other.processors.contains(processor)).forEach(result::addProcessor);
+ controllerServices.stream().filter(controllerService ->
!other.controllerServices.contains(controllerService)).forEach(result::addControllerService);
+ reportingTasks.stream().filter(task ->
!other.reportingTasks.contains(task)).forEach(result::addReportingTask);
+ flowAnalysisRules.stream().filter(rule ->
!other.flowAnalysisRules.contains(rule)).forEach(result::addFlowAnalysisRule);
+ flowRegistryClients.stream().filter(client ->
!other.flowRegistryClients.contains(client)).forEach(result::addFlowRegistryClient);
+ statelessProcessGroups.stream().filter(group ->
!other.statelessProcessGroups.contains(group)).forEach(result::addStatelessGroup);
+
+ return result;
+ }
+
+ /**
+ * Returns a boolean indicating whether or not this AffectedComponentSet
is empty
+ * @return <code>true</code> if the AffectedComponentSet is empty,
<code>false</code> otherwise
+ */
+ public boolean isEmpty() {
+ return inputPorts.isEmpty() && outputPorts.isEmpty() &&
remoteInputPorts.isEmpty() && remoteOutputPorts.isEmpty()
+ && processors.isEmpty() && controllerServices.isEmpty() &&
reportingTasks.isEmpty() && flowAnalysisRules.isEmpty()
+ && flowRegistryClients.isEmpty() &&
statelessProcessGroups.isEmpty();
}
/**
@@ -644,7 +684,7 @@ public class AffectedComponentSet {
* that one or more components referred to by the AffectedComponentSet no
longer exist (for example, there was a dataflow update that removed a
Processor, so that Processor no longer exists).
*
* @return an AffectedComponentSet that represents all components within
this AffectedComponentSet that currently exist within the NiFi instance. The
components contained by the returned
- * AffectedComponentSetwill always be a subset or equal to the set of
components contained by this.
+ * AffectedComponentSet will always be a subset or equal to the set of
components contained by this.
*/
public AffectedComponentSet toExistingSet() {
final ControllerServiceProvider serviceProvider =
flowController.getControllerServiceProvider();
@@ -688,18 +728,12 @@ public class AffectedComponentSet {
}
private boolean isStartable(final ComponentNode componentNode) {
- if (componentNode == null) {
- return false;
- }
-
- if (componentNode instanceof ProcessorNode) {
- return ((ProcessorNode) componentNode).getScheduledState() !=
ScheduledState.DISABLED;
- }
- if (componentNode instanceof ReportingTaskNode) {
- return ((ReportingTaskNode) componentNode).getScheduledState() !=
ScheduledState.DISABLED;
- }
-
- return true;
+ return switch (componentNode) {
+ case null -> false;
+ case ProcessorNode processorNode ->
processorNode.getScheduledState() != ScheduledState.DISABLED;
+ case ReportingTaskNode reportingTaskNode ->
reportingTaskNode.getScheduledState() != ScheduledState.DISABLED;
+ default -> true;
+ };
}
private boolean isStartable(final ProcessGroup group) {
@@ -726,7 +760,7 @@ public class AffectedComponentSet {
}
public void stop() {
- logger.info("Stopping the following components: {}", this);
+ logger.info("Stopping the following {} components: {}",
getComponentCount(), this);
final long start = System.currentTimeMillis();
inputPorts.forEach(port -> port.getProcessGroup().stopInputPort(port));
@@ -736,7 +770,7 @@ public class AffectedComponentSet {
processors.forEach(processor ->
processor.getProcessGroup().stopProcessor(processor));
reportingTasks.forEach(flowController::stopReportingTask);
flowAnalysisRules.forEach(flowController::disableFlowAnalysisRule);
- statelessProcessGroups.forEach(group -> group.stopProcessing());
+ statelessProcessGroups.forEach(ProcessGroup::stopProcessing);
waitForConnectablesStopped();
@@ -817,6 +851,7 @@ public class AffectedComponentSet {
return true;
}
+
@Override
public String toString() {
return "AffectedComponentSet[" +
@@ -826,7 +861,7 @@ public class AffectedComponentSet {
", remoteOutputPorts=" + remoteOutputPorts +
", processors=" + processors +
", parameterProviders=" + parameterProviders +
- ", flowRegistryClients=" + flowRegistryClients +
+ ", flowRegistryClients=" + flowRegistryClients +
", controllerServices=" + controllerServices +
", reportingTasks=" + reportingTasks +
", flowAnalysisRules=" + flowAnalysisRules +
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
index 027ad0b19d..3dab40ea51 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
@@ -48,7 +48,7 @@ public interface ComponentSetFilter {
boolean testStatelessGroup(ProcessGroup group);
- default ComponentSetFilter reverse() {
+ default ComponentSetFilter invert() {
final ComponentSetFilter original = this;
return new ComponentSetFilter() {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 76b71636b2..678dec7ea3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -220,16 +220,8 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
synchronizeFlow(controller, existingDataFlow, proposedFlow,
affectedComponents);
} finally {
- // We have to call toExistingSet() here because some of the
components that existed in the active set may no longer exist,
- // so attempting to start them will fail.
-
if (!existingFlowEmpty) {
- final AffectedComponentSet startable =
activeSet.toExistingSet().toStartableSet();
-
- final ComponentSetFilter runningComponentFilter = new
RunningComponentSetFilter(proposedFlow.getVersionedDataflow());
- final ComponentSetFilter stoppedComponentFilter =
runningComponentFilter.reverse();
- startable.removeComponents(stoppedComponentFilter);
- startable.start();
+ restart(activeSet, proposedFlow.getVersionedDataflow());
}
}
@@ -237,6 +229,30 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
logger.info("Successfully synchronized dataflow with the proposed flow
in {} millis", millis);
}
+ private void restart(final AffectedComponentSet activeSet, final
VersionedDataflow versionedDataflow) {
+ final AffectedComponentSet existing = activeSet.toExistingSet();
+ final AffectedComponentSet noLongerExisting =
activeSet.minus(existing);
+ if (!noLongerExisting.isEmpty()) {
+ logger.info("After synchronizing flow, the followinging components
will not be restarted because they no longer exist: {}", noLongerExisting);
+ }
+
+ final AffectedComponentSet startable = existing.toStartableSet();
+ final AffectedComponentSet notStartable = existing.minus(startable);
+ if (!notStartable.isEmpty()) {
+ logger.info("After synchronizing flow, the following components
will not be restarted because they are not in a startable state: {}",
notStartable);
+ }
+
+ final ComponentSetFilter runningComponentFilter = new
RunningComponentSetFilter(versionedDataflow);
+ final ComponentSetFilter stoppedComponentFilter =
runningComponentFilter.invert();
+ final AffectedComponentSet stoppedComponents =
startable.removeComponents(stoppedComponentFilter);
+ if (!stoppedComponents.isEmpty()) {
+ logger.info("After synchronizing flow, the following components
will not be restarted because the proposed flow indicates that they are not
running: {}", stoppedComponents);
+ }
+
+ logger.info("After synchronizing flow, restarting {} components",
startable.getComponentCount());
+ startable.start();
+ }
+
private void verifyNoConnectionsWithDataRemoved(final DataFlow
existingFlow, final DataFlow proposedFlow, final FlowController controller,
final FlowComparison flowComparison) {
logger.debug("Checking that no connections were removed that have
data");
final FlowInheritabilityCheck processGroupInheritableCheck = new
ConnectionMissingCheck(flowComparison);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
new file mode 100644
index 0000000000..69db20bd76
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+public class TestAffectedComponentSet {
+
+ private static final FlowController controller =
Mockito.mock(FlowController.class);
+ private static final FlowManager flowManager =
Mockito.mock(FlowManager.class);
+ static {
+ when(controller.getFlowManager()).thenReturn(flowManager);
+ }
+
+ @Test
+ public void testMinus() {
+ final AffectedComponentSet setA = new AffectedComponentSet(controller);
+ final AffectedComponentSet setB = new AffectedComponentSet(controller);
+
+ for (int i = 0; i < 10; i++) {
+ final ProcessorNode processorNode =
Mockito.mock(ProcessorNode.class);
+ setA.addProcessor(processorNode);
+
+ if (i < 8) {
+ setB.addProcessor(processorNode);
+ }
+ }
+
+ final AffectedComponentSet difference = setA.minus(setB);
+ assertEquals(10, setA.getComponentCount());
+ assertEquals(8, setB.getComponentCount());
+ assertEquals(2, difference.getComponentCount());
+ }
+
+ @Test
+ public void testRemoveComponents() {
+ final AffectedComponentSet setA = new AffectedComponentSet(controller);
+ final List<ProcessorNode> toRemove = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ final ProcessorNode processorNode =
Mockito.mock(ProcessorNode.class);
+ setA.addProcessor(processorNode);
+
+ if (i < 8) {
+ toRemove.add(processorNode);
+ }
+ }
+
+ final ComponentSetFilter componentSetFilter =
Mockito.mock(ComponentSetFilter.class);
+ doAnswer(invocation -> {
+ final ProcessorNode toTest = invocation.getArgument(0,
ProcessorNode.class);
+ return toRemove.contains(toTest);
+ }).when(componentSetFilter).testProcessor(any(ProcessorNode.class));
+
+ final AffectedComponentSet removed =
setA.removeComponents(componentSetFilter);
+ assertEquals(2, setA.getComponentCount());
+ assertEquals(8, removed.getComponentCount());
+ }
+}