This is an automated email from the ASF dual-hosted git repository. royteeuwen pushed a commit to branch SLING-12349 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git
commit 631b203e2eaddac472ea75f343aa449b8a707205 Author: Roy Teeuwen <[email protected]> AuthorDate: Tue Jul 16 08:32:56 2024 +0200 Add processor based calculation for the eventing threadpool --- .../sling/event/impl/EventingThreadPool.java | 21 ++++++-- .../sling/event/impl/ProcessorBasedCalculator.java | 54 +++++++++++++++++++++ .../jobs/config/InternalQueueConfiguration.java | 22 +-------- .../event/impl/ProcessorBasedCalculatorTest.java | 56 ++++++++++++++++++++++ .../config/InternalQueueConfigurationTest.java | 22 ++------- 5 files changed, 132 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java b/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java index 0cf1531..a37d6c8 100644 --- a/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java +++ b/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java @@ -33,6 +33,8 @@ import org.osgi.service.component.annotations.ReferencePolicyOption; import org.osgi.service.metatype.annotations.AttributeDefinition; import org.osgi.service.metatype.annotations.Designate; import org.osgi.service.metatype.annotations.ObjectClassDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -53,11 +55,19 @@ public class EventingThreadPool implements ThreadPool { public @interface Config { @AttributeDefinition(name = "Pool Size", - description="The size of the thread pool. This pool is used to execute jobs and therefore " - + "limits the maximum number of jobs executed in parallel.") - int minPoolSize() default 35; + description = "The size of the thread pool. This pool is used to execute jobs and therefore " + + "limits the maximum number of jobs executed in parallel. " + + "A value of -1 is substituted with the number of available processors. " + + "A decimal number between 0.0 and 1.0 is treated as a fraction of available processors. " + + "For example 0.5 means half of the available processors. ") + double minPoolSize() default 35; } + /** + * Logger. + */ + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + @Reference(policyOption=ReferencePolicyOption.GREEDY) private ThreadPoolManager threadPoolManager; @@ -86,9 +96,10 @@ public class EventingThreadPool implements ThreadPool { this.configure(config.minPoolSize()); } - private void configure(final int maxPoolSize) { + private void configure(final double maxPoolSize) { + final int poolSize = ProcessorBasedCalculator.calculate(logger, maxPoolSize); final ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig(); - config.setMinPoolSize(maxPoolSize); + config.setMinPoolSize(poolSize); config.setMaxPoolSize(config.getMinPoolSize()); config.setQueueSize(-1); // unlimited config.setShutdownGraceful(true); diff --git a/src/main/java/org/apache/sling/event/impl/ProcessorBasedCalculator.java b/src/main/java/org/apache/sling/event/impl/ProcessorBasedCalculator.java new file mode 100644 index 0000000..233fc41 --- /dev/null +++ b/src/main/java/org/apache/sling/event/impl/ProcessorBasedCalculator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl; + +import org.apache.sling.event.impl.jobs.config.ConfigurationConstants; +import org.slf4j.Logger; + +public abstract class ProcessorBasedCalculator { + + /** + * Float values are treated as percentage. int values are treated as number of cores, -1 == all available + * Note: the value is based on the core count at startup. It will not change dynamically if core count changes. + * + * @param input The input, being either -1, between 0 and 1 or 1+ + * @return input of -1 will return the amount of cores, between 0 and 1 will return a percentage of the cores, 1 or higher returns the same number + */ + public static int calculate(final Logger logger, final double input) { + int cores = ConfigurationConstants.NUMBER_OF_PROCESSORS; + int result = cores; + logger.debug("Input for processor based calculation {}", input); + if ((input == Math.floor(input)) && !Double.isInfinite(input)) { + // integral type + if ((int) input == 0) { + logger.warn("Input set to zero."); + } + result = (input <= -1 ? cores : (int) input); + } else { + // percentage (rounded) + if ((input > 0.0) && (input < 1.0)) { + result = (int) Math.round(cores * input); + } else { + logger.warn("Invalid input {}, defaulting to cores {}.", input, cores); + } + } + logger.debug("Result {}", result); + return result; + } +} diff --git a/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java b/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java index 537dd05..20e254b 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java @@ -21,6 +21,7 @@ package org.apache.sling.event.impl.jobs.config; import java.util.Arrays; import java.util.Map; +import org.apache.sling.event.impl.ProcessorBasedCalculator; import org.apache.sling.event.impl.support.TopicMatcher; import org.apache.sling.event.impl.support.TopicMatcherHelper; import org.apache.sling.event.jobs.QueueConfiguration; @@ -205,26 +206,7 @@ public class InternalQueueConfiguration this.retries = config.queue_retries(); this.retryDelay = config.queue_retrydelay(); - // Float values are treated as percentage. int values are treated as number of cores, -1 == all available - // Note: the value is based on the core count at startup. It will not change dynamically if core count changes. - int cores = ConfigurationConstants.NUMBER_OF_PROCESSORS; - final double inMaxParallel = config.queue_maxparallel(); - logger.debug("Max parallel for queue {} is {}", this.name, inMaxParallel); - if ((inMaxParallel == Math.floor(inMaxParallel)) && !Double.isInfinite(inMaxParallel)) { - // integral type - if ((int) inMaxParallel == 0) { - logger.warn("Max threads property for {} set to zero.", this.name); - } - this.maxParallelProcesses = (inMaxParallel <= -1 ? cores : (int) inMaxParallel); - } else { - // percentage (rounded) - if ((inMaxParallel > 0.0) && (inMaxParallel < 1.0)) { - this.maxParallelProcesses = (int) Math.round(cores * inMaxParallel); - } else { - logger.warn("Invalid queue max parallel value for queue {}. Using {}", this.name, cores); - this.maxParallelProcesses = cores; - } - } + this.maxParallelProcesses = ProcessorBasedCalculator.calculate(logger, config.queue_maxparallel()); logger.debug("Thread pool size for {} was set to {}", this.name, this.maxParallelProcesses); // ignore parallel setting for ordered queues diff --git a/src/test/java/org/apache/sling/event/impl/ProcessorBasedCalculatorTest.java b/src/test/java/org/apache/sling/event/impl/ProcessorBasedCalculatorTest.java new file mode 100644 index 0000000..8172335 --- /dev/null +++ b/src/test/java/org/apache/sling/event/impl/ProcessorBasedCalculatorTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +public class ProcessorBasedCalculatorTest { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + @Test + public void testCalculate() { + assertEquals(Runtime.getRuntime().availableProcessors(), ProcessorBasedCalculator.calculate(logger, -1)); + + // Edge cases 0.0 and 1.0 (treated as int numbers) + assertEquals(0, ProcessorBasedCalculator.calculate(logger, 0.0)); + + assertEquals(1, ProcessorBasedCalculator.calculate(logger, 1.0)); + + // percentage (50%) + assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.5), ProcessorBasedCalculator.calculate(logger, 0.5)); + + // rounding + assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.9), ProcessorBasedCalculator.calculate(logger, 0.90)); + + assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.99), ProcessorBasedCalculator.calculate(logger, 0.99)); + + // Percentages can't go over 99% (0.99) + assertEquals(Runtime.getRuntime().availableProcessors(), ProcessorBasedCalculator.calculate(logger, 1.01)); + + // Treat negative values same a -1 (all cores) + assertEquals(Runtime.getRuntime().availableProcessors(), ProcessorBasedCalculator.calculate(logger, -0.5)); + + assertEquals(Runtime.getRuntime().availableProcessors(), ProcessorBasedCalculator.calculate(logger, -2)); + } +} diff --git a/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java b/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java index b6aa5e4..1bfbbd3 100644 --- a/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java +++ b/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java @@ -122,7 +122,7 @@ public class InternalQueueConfigurationTest { }; } - @org.junit.Test public void testMaxParallel() { + @org.junit.Test public void testMaxParallelUsesProcessorBasedCalculator() { InternalQueueConfiguration c = InternalQueueConfiguration.fromConfiguration(Collections.<String, Object>emptyMap(), createConfig(-1)); assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel()); @@ -137,23 +137,9 @@ public class InternalQueueConfigurationTest { c = InternalQueueConfiguration.fromConfiguration(Collections.<String, Object>emptyMap(), createConfig(0.5)); assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.5), c.getMaxParallel()); - // rounding - c = InternalQueueConfiguration.fromConfiguration(Collections.<String, Object>emptyMap(), createConfig(0.90)); - assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.9), c.getMaxParallel()); - - c = InternalQueueConfiguration.fromConfiguration(Collections.<String, Object>emptyMap(), createConfig(0.99)); - assertEquals((int) Math.round(Runtime.getRuntime().availableProcessors() * 0.99), c.getMaxParallel()); - - // Percentages can't go over 99% (0.99) - c = InternalQueueConfiguration.fromConfiguration(Collections.<String, Object>emptyMap(), createConfig(1.01)); - assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel()); - - // Treat negative values same a -1 (all cores) - c = InternalQueueConfiguration.fromConfiguration(Collections.<String, Object>emptyMap(), createConfig(-0.5)); - assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel()); - - c = InternalQueueConfiguration.fromConfiguration(Collections.<String, Object>emptyMap(), createConfig(-2)); - assertEquals(Runtime.getRuntime().availableProcessors(), c.getMaxParallel()); + // specific amount + c = InternalQueueConfiguration.fromConfiguration(Collections.<String, Object>emptyMap(), createConfig(5)); + assertEquals((int) 5, c.getMaxParallel()); } @org.junit.Test public void testTopicMatchersDot() {
