This is an automated email from the ASF dual-hosted git repository.

royteeuwen pushed a commit to branch SLING-12349
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git

commit 631b203e2eaddac472ea75f343aa449b8a707205
Author: Roy Teeuwen <[email protected]>
AuthorDate: Tue Jul 16 08:32:56 2024 +0200

    Add processor based calculation for the eventing threadpool
---
 .../sling/event/impl/EventingThreadPool.java       | 21 ++++++--
 .../sling/event/impl/ProcessorBasedCalculator.java | 54 +++++++++++++++++++++
 .../jobs/config/InternalQueueConfiguration.java    | 22 +--------
 .../event/impl/ProcessorBasedCalculatorTest.java   | 56 ++++++++++++++++++++++
 .../config/InternalQueueConfigurationTest.java     | 22 ++-------
 5 files changed, 132 insertions(+), 43 deletions(-)

diff --git a/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java 
b/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
index 0cf1531..a37d6c8 100644
--- a/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
+++ b/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
@@ -33,6 +33,8 @@ import 
org.osgi.service.component.annotations.ReferencePolicyOption;
 import org.osgi.service.metatype.annotations.AttributeDefinition;
 import org.osgi.service.metatype.annotations.Designate;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -53,11 +55,19 @@ public class EventingThreadPool implements ThreadPool {
     public @interface Config {
 
         @AttributeDefinition(name = "Pool Size",
-              description="The size of the thread pool. This pool is used to 
execute jobs and therefore "
-                        + "limits the maximum number of jobs executed in 
parallel.")
-        int minPoolSize() default 35;
+                description = "The size of the thread pool. This pool is used 
to execute jobs and therefore "
+                        + "limits the maximum number of jobs executed in 
parallel. "
+                        + "A value of -1 is substituted with the number of 
available processors. "
+                        + "A decimal number between 0.0 and 1.0 is treated as 
a fraction of available processors. "
+                        + "For example 0.5 means half of the available 
processors. ")
+        double minPoolSize() default 35;
     }
 
+    /**
+     * Logger.
+     */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
     @Reference(policyOption=ReferencePolicyOption.GREEDY)
     private ThreadPoolManager threadPoolManager;
 
@@ -86,9 +96,10 @@ public class EventingThreadPool implements ThreadPool {
         this.configure(config.minPoolSize());
     }
 
-    private void configure(final int maxPoolSize) {
+    private void configure(final double maxPoolSize) {
+        final int poolSize = ProcessorBasedCalculator.calculate(logger, 
maxPoolSize);
         final ModifiableThreadPoolConfig config = new 
ModifiableThreadPoolConfig();
-        config.setMinPoolSize(maxPoolSize);
+        config.setMinPoolSize(poolSize);
         config.setMaxPoolSize(config.getMinPoolSize());
         config.setQueueSize(-1); // unlimited
         config.setShutdownGraceful(true);
diff --git 
a/src/main/java/org/apache/sling/event/impl/ProcessorBasedCalculator.java 
b/src/main/java/org/apache/sling/event/impl/ProcessorBasedCalculator.java
new file mode 100644
index 0000000..233fc41
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/ProcessorBasedCalculator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.slf4j.Logger;
+
+public abstract class ProcessorBasedCalculator {
+
+    /**
+     * Float values are treated as percentage.  int values are treated as 
number of cores, -1 == all available
+     * Note: the value is based on the core count at startup.  It will not 
change dynamically if core count changes.
+     *
+     * @param input The input, being either -1, between 0 and 1 or 1+
+     * @return input of -1 will return the amount of cores, between 0 and 1 
will return a percentage of the cores, 1 or higher returns the same number
+     */
+    public static int calculate(final Logger logger, final double input) {
+        int cores = ConfigurationConstants.NUMBER_OF_PROCESSORS;
+        int result = cores;
+        logger.debug("Input for processor based calculation {}", input);
+        if ((input == Math.floor(input)) && !Double.isInfinite(input)) {
+            // integral type
+            if ((int) input == 0) {
+                logger.warn("Input set to zero.");
+            }
+            result = (input <= -1 ? cores : (int) input);
+        } else {
+            // percentage (rounded)
+            if ((input > 0.0) && (input < 1.0)) {
+                result = (int) Math.round(cores * input);
+            } else {
+                logger.warn("Invalid input {}, defaulting to cores {}.", 
input, cores);
+            }
+        }
+        logger.debug("Result {}", result);
+        return result;
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
 
b/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
index 537dd05..20e254b 100644
--- 
a/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
+++ 
b/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.sling.event.impl.jobs.config;
 import java.util.Arrays;
 import java.util.Map;
 
+import org.apache.sling.event.impl.ProcessorBasedCalculator;
 import org.apache.sling.event.impl.support.TopicMatcher;
 import org.apache.sling.event.impl.support.TopicMatcherHelper;
 import org.apache.sling.event.jobs.QueueConfiguration;
@@ -205,26 +206,7 @@ public class InternalQueueConfiguration
         this.retries = config.queue_retries();
         this.retryDelay = config.queue_retrydelay();
 
-        // Float values are treated as percentage.  int values are treated as 
number of cores, -1 == all available
-        // Note: the value is based on the core count at startup.  It will not 
change dynamically if core count changes.
-        int cores = ConfigurationConstants.NUMBER_OF_PROCESSORS;
-        final double inMaxParallel = config.queue_maxparallel();
-        logger.debug("Max parallel for queue {} is {}", this.name, 
inMaxParallel);
-        if ((inMaxParallel == Math.floor(inMaxParallel)) && 
!Double.isInfinite(inMaxParallel)) {
-            // integral type
-            if ((int) inMaxParallel == 0) {
-                logger.warn("Max threads property for {} set to zero.", 
this.name);
-            }
-            this.maxParallelProcesses = (inMaxParallel <= -1 ? cores : (int) 
inMaxParallel);
-        } else {
-            // percentage (rounded)
-            if ((inMaxParallel > 0.0) && (inMaxParallel < 1.0)) {
-                this.maxParallelProcesses = (int) Math.round(cores * 
inMaxParallel);
-            } else {
-                logger.warn("Invalid queue max parallel value for queue {}. 
Using {}", this.name, cores);
-                this.maxParallelProcesses =  cores;
-            }
-        }
+        this.maxParallelProcesses = ProcessorBasedCalculator.calculate(logger, 
config.queue_maxparallel());
         logger.debug("Thread pool size for {} was set to {}", this.name, 
this.maxParallelProcesses);
 
         // ignore parallel setting for ordered queues
diff --git 
a/src/test/java/org/apache/sling/event/impl/ProcessorBasedCalculatorTest.java 
b/src/test/java/org/apache/sling/event/impl/ProcessorBasedCalculatorTest.java
new file mode 100644
index 0000000..8172335
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/event/impl/ProcessorBasedCalculatorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProcessorBasedCalculatorTest {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testCalculate() {
+        assertEquals(Runtime.getRuntime().availableProcessors(), 
ProcessorBasedCalculator.calculate(logger, -1));
+
+        // Edge cases 0.0 and 1.0 (treated as int numbers)
+        assertEquals(0, ProcessorBasedCalculator.calculate(logger, 0.0));
+
+        assertEquals(1, ProcessorBasedCalculator.calculate(logger, 1.0));
+
+        // percentage (50%)
+        assertEquals((int) 
Math.round(Runtime.getRuntime().availableProcessors() * 0.5), 
ProcessorBasedCalculator.calculate(logger, 0.5));
+
+        // rounding
+        assertEquals((int) 
Math.round(Runtime.getRuntime().availableProcessors() * 0.9), 
ProcessorBasedCalculator.calculate(logger, 0.90));
+
+        assertEquals((int) 
Math.round(Runtime.getRuntime().availableProcessors() * 0.99), 
ProcessorBasedCalculator.calculate(logger, 0.99));
+
+        // Percentages can't go over 99% (0.99)
+        assertEquals(Runtime.getRuntime().availableProcessors(), 
ProcessorBasedCalculator.calculate(logger, 1.01));
+
+        // Treat negative values same a -1 (all cores)
+        assertEquals(Runtime.getRuntime().availableProcessors(), 
ProcessorBasedCalculator.calculate(logger, -0.5));
+
+        assertEquals(Runtime.getRuntime().availableProcessors(), 
ProcessorBasedCalculator.calculate(logger, -2));
+    }
+}
diff --git 
a/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
 
b/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
index b6aa5e4..1bfbbd3 100644
--- 
a/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
+++ 
b/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
@@ -122,7 +122,7 @@ public class InternalQueueConfigurationTest {
         };
     }
 
-    @org.junit.Test public void testMaxParallel() {
+    @org.junit.Test public void testMaxParallelUsesProcessorBasedCalculator() {
         InternalQueueConfiguration c = 
InternalQueueConfiguration.fromConfiguration(Collections.<String, 
Object>emptyMap(), createConfig(-1));
         assertEquals(Runtime.getRuntime().availableProcessors(), 
c.getMaxParallel());
 
@@ -137,23 +137,9 @@ public class InternalQueueConfigurationTest {
         c = InternalQueueConfiguration.fromConfiguration(Collections.<String, 
Object>emptyMap(), createConfig(0.5));
         assertEquals((int) 
Math.round(Runtime.getRuntime().availableProcessors() * 0.5), 
c.getMaxParallel());
 
-        // rounding
-        c = InternalQueueConfiguration.fromConfiguration(Collections.<String, 
Object>emptyMap(), createConfig(0.90));
-        assertEquals((int) 
Math.round(Runtime.getRuntime().availableProcessors() * 0.9), 
c.getMaxParallel());
-
-        c = InternalQueueConfiguration.fromConfiguration(Collections.<String, 
Object>emptyMap(), createConfig(0.99));
-        assertEquals((int) 
Math.round(Runtime.getRuntime().availableProcessors() * 0.99), 
c.getMaxParallel());
-
-        // Percentages can't go over 99% (0.99)
-        c = InternalQueueConfiguration.fromConfiguration(Collections.<String, 
Object>emptyMap(), createConfig(1.01));
-        assertEquals(Runtime.getRuntime().availableProcessors(), 
c.getMaxParallel());
-
-        // Treat negative values same a -1 (all cores)
-        c = InternalQueueConfiguration.fromConfiguration(Collections.<String, 
Object>emptyMap(), createConfig(-0.5));
-        assertEquals(Runtime.getRuntime().availableProcessors(), 
c.getMaxParallel());
-
-        c = InternalQueueConfiguration.fromConfiguration(Collections.<String, 
Object>emptyMap(), createConfig(-2));
-        assertEquals(Runtime.getRuntime().availableProcessors(), 
c.getMaxParallel());
+        // specific amount
+        c = InternalQueueConfiguration.fromConfiguration(Collections.<String, 
Object>emptyMap(), createConfig(5));
+        assertEquals((int) 5, c.getMaxParallel());
     }
 
     @org.junit.Test public void testTopicMatchersDot() {

Reply via email to