This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 88d2c03b37 NIFI-11245 Corrected reducing Maximum Thread Count while
running (#7005)
88d2c03b37 is described below
commit 88d2c03b37d6cdabd260658c84e4305b4fa14c99
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Mar 3 10:35:34 2023 -0600
NIFI-11245 Corrected reducing Maximum Thread Count while running (#7005)
- Corrected implementation to allow reducing Maximum Thread Count below the
default of 10
---
.../org/apache/nifi/controller/FlowController.java | 19 +++++++++++++------
.../apache/nifi/controller/TestFlowController.java | 21 ++++++++++++++++++---
2 files changed, 31 insertions(+), 9 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ce93d4a474..376bdcc1f9 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1567,7 +1567,7 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
public void setMaxTimerDrivenThreadCount(final int maxThreadCount) {
writeLock.lock();
try {
- setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(),
this.maxTimerDrivenThreads);
+ setMaxThreadCount(maxThreadCount, "Timer Driven",
this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
} finally {
writeLock.unlock("setMaxTimerDrivenThreadCount");
}
@@ -1576,7 +1576,7 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
public void setMaxEventDrivenThreadCount(final int maxThreadCount) {
writeLock.lock();
try {
- setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(),
this.maxEventDrivenThreads);
+ setMaxThreadCount(maxThreadCount, "Event Driven",
this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads);
processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN,
maxThreadCount);
} finally {
writeLock.unlock("setMaxEventDrivenThreadCount");
@@ -1587,16 +1587,23 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
* Updates the number of threads that can be simultaneously used for
executing processors.
* This method must be called while holding the write lock!
*
- * @param maxThreadCount max number of threads
+ * @param maxThreadCount Requested new thread pool size
+ * @param poolName Thread Pool Name
+ * @param engine Flow Engine executor or null when terminated
+ * @param maxThreads Internal tracker for Maximum Threads
*/
- private void setMaxThreadCount(final int maxThreadCount, final FlowEngine
engine, final AtomicInteger maxThreads) {
+ private void setMaxThreadCount(final int maxThreadCount, final String
poolName, final FlowEngine engine, final AtomicInteger maxThreads) {
if (maxThreadCount < 1) {
throw new IllegalArgumentException("Cannot set max number of
threads to less than 1");
}
maxThreads.getAndSet(maxThreadCount);
- if (null != engine && engine.getCorePoolSize() < maxThreadCount) {
- engine.setCorePoolSize(maxThreads.intValue());
+ if (engine == null) {
+ LOG.debug("[{}] Engine not found: Maximum Thread Count not
updated", poolName);
+ } else {
+ final int previousCorePoolSize = engine.getCorePoolSize();
+ engine.setCorePoolSize(maxThreadCount);
+ LOG.info("[{}] Maximum Thread Count updated [{}] previous [{}]",
poolName, maxThreadCount, previousCorePoolSize);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index d6711cf6df..acc52c9cb9 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -53,7 +53,6 @@ import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
-import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.flow.ComponentType;
import org.apache.nifi.flow.Position;
import org.apache.nifi.flow.VersionedProcessGroup;
@@ -129,6 +128,9 @@ import static org.mockito.Mockito.when;
public class TestFlowController {
+ private static final int INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT = 10;
+ private static final int REQUESTED_MAX_TIMER_DRIVEN_THREAD_COUNT = 2;
+
private FlowController controller;
private AbstractPolicyBasedAuthorizer authorizer;
private FlowFileEventRepository flowFileEventRepo;
@@ -160,7 +162,7 @@ public class TestFlowController {
otherProps.put("nifi.remote.input.secure", "");
final String propsFile =
"src/test/resources/flowcontrollertest.nifi.properties";
nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile,
otherProps);
- encryptor =
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties);
+ encryptor = mock(PropertyEncryptor.class);
// use the system bundle
systemBundle = SystemBundle.create(nifiProperties);
@@ -1355,11 +1357,24 @@ public class TestFlowController {
assertEquals(0, componentCounts.get("Public Output Ports").intValue());
}
+ @Test
+ public void testMaxTimerDrivenThreadCount() {
+ final int startingCount = controller.getMaxTimerDrivenThreadCount();
+
+ assertEquals(INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT, startingCount);
+
+
controller.setMaxTimerDrivenThreadCount(REQUESTED_MAX_TIMER_DRIVEN_THREAD_COUNT);
+ assertEquals(REQUESTED_MAX_TIMER_DRIVEN_THREAD_COUNT,
controller.getMaxTimerDrivenThreadCount());
+
+
controller.setMaxTimerDrivenThreadCount(INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT);
+ assertEquals(INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT,
controller.getMaxTimerDrivenThreadCount());
+ }
+
private String getNewJsonFlow() throws JsonProcessingException {
final VersionedDataflow versionedDataflow = new VersionedDataflow();
versionedDataflow.setEncodingVersion(new
VersionedFlowEncodingVersion(2, 0));
- versionedDataflow.setMaxTimerDrivenThreadCount(10);
+
versionedDataflow.setMaxTimerDrivenThreadCount(INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT);
versionedDataflow.setRegistries(Collections.emptyList());
versionedDataflow.setParameterContexts(Collections.emptyList());
versionedDataflow.setControllerServices(Collections.emptyList());