This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 88d2c03b37 NIFI-11245 Corrected reducing Maximum Thread Count while 
running (#7005)
88d2c03b37 is described below

commit 88d2c03b37d6cdabd260658c84e4305b4fa14c99
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Mar 3 10:35:34 2023 -0600

    NIFI-11245 Corrected reducing Maximum Thread Count while running (#7005)
    
    - Corrected implementation to allow reducing Maximum Thread Count below the 
default of 10
---
 .../org/apache/nifi/controller/FlowController.java  | 19 +++++++++++++------
 .../apache/nifi/controller/TestFlowController.java  | 21 ++++++++++++++++++---
 2 files changed, 31 insertions(+), 9 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ce93d4a474..376bdcc1f9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1567,7 +1567,7 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
     public void setMaxTimerDrivenThreadCount(final int maxThreadCount) {
         writeLock.lock();
         try {
-            setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(), 
this.maxTimerDrivenThreads);
+            setMaxThreadCount(maxThreadCount, "Timer Driven", 
this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
         } finally {
             writeLock.unlock("setMaxTimerDrivenThreadCount");
         }
@@ -1576,7 +1576,7 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
     public void setMaxEventDrivenThreadCount(final int maxThreadCount) {
         writeLock.lock();
         try {
-            setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(), 
this.maxEventDrivenThreads);
+            setMaxThreadCount(maxThreadCount, "Event Driven", 
this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads);
             
processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, 
maxThreadCount);
         } finally {
             writeLock.unlock("setMaxEventDrivenThreadCount");
@@ -1587,16 +1587,23 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
      * Updates the number of threads that can be simultaneously used for 
executing processors.
      * This method must be called while holding the write lock!
      *
-     * @param maxThreadCount max number of threads
+     * @param maxThreadCount Requested new thread pool size
+     * @param poolName Thread Pool Name
+     * @param engine Flow Engine executor or null when terminated
+     * @param maxThreads Internal tracker for Maximum Threads
      */
-    private void setMaxThreadCount(final int maxThreadCount, final FlowEngine 
engine, final AtomicInteger maxThreads) {
+    private void setMaxThreadCount(final int maxThreadCount, final String 
poolName, final FlowEngine engine, final AtomicInteger maxThreads) {
         if (maxThreadCount < 1) {
             throw new IllegalArgumentException("Cannot set max number of 
threads to less than 1");
         }
 
         maxThreads.getAndSet(maxThreadCount);
-        if (null != engine && engine.getCorePoolSize() < maxThreadCount) {
-            engine.setCorePoolSize(maxThreads.intValue());
+        if (engine == null) {
+            LOG.debug("[{}] Engine not found: Maximum Thread Count not 
updated", poolName);
+        } else {
+            final int previousCorePoolSize = engine.getCorePoolSize();
+            engine.setCorePoolSize(maxThreadCount);
+            LOG.info("[{}] Maximum Thread Count updated [{}] previous [{}]", 
poolName, maxThreadCount, previousCorePoolSize);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index d6711cf6df..acc52c9cb9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -53,7 +53,6 @@ import org.apache.nifi.controller.service.mock.ServiceA;
 import org.apache.nifi.controller.service.mock.ServiceB;
 import org.apache.nifi.controller.status.history.StatusHistoryRepository;
 import org.apache.nifi.encrypt.PropertyEncryptor;
-import org.apache.nifi.encrypt.PropertyEncryptorFactory;
 import org.apache.nifi.flow.ComponentType;
 import org.apache.nifi.flow.Position;
 import org.apache.nifi.flow.VersionedProcessGroup;
@@ -129,6 +128,9 @@ import static org.mockito.Mockito.when;
 
 public class TestFlowController {
 
+    private static final int INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT = 10;
+    private static final int REQUESTED_MAX_TIMER_DRIVEN_THREAD_COUNT = 2;
+
     private FlowController controller;
     private AbstractPolicyBasedAuthorizer authorizer;
     private FlowFileEventRepository flowFileEventRepo;
@@ -160,7 +162,7 @@ public class TestFlowController {
         otherProps.put("nifi.remote.input.secure", "");
         final String propsFile = 
"src/test/resources/flowcontrollertest.nifi.properties";
         nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, 
otherProps);
-        encryptor = 
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties);
+        encryptor = mock(PropertyEncryptor.class);
 
         // use the system bundle
         systemBundle = SystemBundle.create(nifiProperties);
@@ -1355,11 +1357,24 @@ public class TestFlowController {
         assertEquals(0, componentCounts.get("Public Output Ports").intValue());
     }
 
+    @Test
+    public void testMaxTimerDrivenThreadCount() {
+        final int startingCount = controller.getMaxTimerDrivenThreadCount();
+
+        assertEquals(INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT, startingCount);
+
+        
controller.setMaxTimerDrivenThreadCount(REQUESTED_MAX_TIMER_DRIVEN_THREAD_COUNT);
+        assertEquals(REQUESTED_MAX_TIMER_DRIVEN_THREAD_COUNT, 
controller.getMaxTimerDrivenThreadCount());
+
+        
controller.setMaxTimerDrivenThreadCount(INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT);
+        assertEquals(INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT, 
controller.getMaxTimerDrivenThreadCount());
+    }
+
     private String getNewJsonFlow() throws JsonProcessingException {
         final VersionedDataflow versionedDataflow = new VersionedDataflow();
 
         versionedDataflow.setEncodingVersion(new 
VersionedFlowEncodingVersion(2, 0));
-        versionedDataflow.setMaxTimerDrivenThreadCount(10);
+        
versionedDataflow.setMaxTimerDrivenThreadCount(INITIAL_MAX_TIMER_DRIVEN_THREAD_COUNT);
         versionedDataflow.setRegistries(Collections.emptyList());
         versionedDataflow.setParameterContexts(Collections.emptyList());
         versionedDataflow.setControllerServices(Collections.emptyList());

Reply via email to