This is an automated email from the ASF dual-hosted git repository.
joerghoh pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git
The following commit(s) were added to refs/heads/master by this push:
new 07826be SLING-13044 reschedule jobs via dedicated threadpool instead
of timers (#51)
07826be is described below
commit 07826be8e57c99e2fc7d575ecf5a8dbcce136c7e
Author: Jörg Hoh <[email protected]>
AuthorDate: Mon Feb 9 13:40:20 2026 +0100
SLING-13044 reschedule jobs via dedicated threadpool instead of timers (#51)
---
.../sling/event/impl/jobs/queues/JobQueueImpl.java | 46 +++++-------
.../impl/jobs/queues/JobReschedulingManager.java | 87 ++++++++++++++++++++++
.../sling/event/impl/jobs/queues/QueueManager.java | 4 +
.../event/impl/jobs/queues/QueueServices.java | 2 +
.../event/impl/jobs/queues/JobQueueImplTest.java | 8 ++
5 files changed, 119 insertions(+), 28 deletions(-)
diff --git
a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index d727186..09c4a4d 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -23,8 +23,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -85,6 +83,8 @@ public class JobQueueImpl implements Queue {
private final ThreadPool threadPool;
+ private final JobReschedulingManager jobReschedulingManager;
+
/** Async counter. */
private final AtomicInteger asyncCounter = new AtomicInteger();
@@ -175,17 +175,19 @@ public class JobQueueImpl implements Queue {
final QueueServices services,
final QueueJobCache cache,
final OutdatedJobQueueInfo outdatedQueue) {
+ this.queueName = name;
+ this.configuration = config;
+
if (config.getOwnThreadPoolSize() > 0) {
this.threadPool = new
EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
} else {
this.threadPool = services.eventingThreadPool;
}
- this.queueName = name;
- this.configuration = config;
this.services = services;
this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.'
+ name);
this.running = true;
this.cache = cache;
+ this.jobReschedulingManager = services.reschedulingManager;
this.maxParallel = config.getMaxParallel();
if (outdatedQueue == null) {
// queue is created the first time
@@ -799,34 +801,22 @@ public class JobQueueImpl implements Queue {
this.isSleepingUntil = fireDate.getTime();
}
- final Runnable t = new Runnable() {
- @Override
- public void run() {
- try {
- if (handler.removeFromRetryList()) {
- requeue(handler);
- }
- waitCounter.decrementAndGet();
- } finally {
- if (configuration.getType() == Type.ORDERED) {
- isSleepingUntil = -1;
- cache.setIsBlocked(false);
- startJobs();
- }
+ final Runnable task = () -> {
+ try {
+ if (handler.removeFromRetryList()) {
+ requeue(handler);
+ }
+ waitCounter.decrementAndGet();
+ } finally {
+ if (configuration.getType() == Type.ORDERED) {
+ isSleepingUntil = -1;
+ cache.setIsBlocked(false);
+ startJobs();
}
}
};
this.waitCounter.incrementAndGet();
- final Timer timer = new Timer();
- timer.schedule(
- new TimerTask() {
-
- @Override
- public void run() {
- t.run();
- }
- },
- delay);
+ this.jobReschedulingManager.reschedule(task, delay);
} else {
// put directly into queue
this.requeue(handler);
diff --git
a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobReschedulingManager.java
b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobReschedulingManager.java
new file mode 100644
index 0000000..ebd1b1b
--- /dev/null
+++
b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobReschedulingManager.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A centralized service to manage the delay after which tasks are executed.
+ *
+ * This service is used to reschedule failed jobs, which are submitted to the
queue again
+ * after the delay has passed. For that reason the threadpool to manage that
delay is quite small,
+ * as the re-submission of the jobs into their respective queues is fast.
+ */
+@Component(service = JobReschedulingManager.class)
+@Designate(ocd = JobReschedulingManager.Config.class)
+public class JobReschedulingManager {
+
+ @ObjectClassDefinition(name = "Apache Sling Job Rescheduling Manager")
+ public @interface Config {
+
+ @AttributeDefinition(name = "thread pool size", description = "number
of threads to execute the rescheduling")
+ public int threadCount() default 1;
+ }
+
+ private static final Logger logger =
LoggerFactory.getLogger(JobReschedulingManager.class);
+
+ private ScheduledThreadPoolExecutor executor;
+
+ @Activate
+ public void activate(Config config) {
+ executor = new ScheduledThreadPoolExecutor(config.threadCount(), new
ThreadFactory() {
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread t = new Thread(r, "sling-job-rescheduler-" +
threadNumber.getAndIncrement());
+ t.setDaemon(true);
+ t.setUncaughtExceptionHandler((Thread thread, Throwable e) ->
+ logger.error("Thread '" + thread.getName() + "'
terminated unexpectedly", e));
+ return t;
+ }
+ });
+ }
+
+ @Deactivate
+ public void deactivate() {
+ // submitted tasks which are not yet executed will be dropped (which
is fine here)
+ executor.shutdown();
+ }
+
+ /**
+ * Start the provided task with a delay in a fire and forgot way
+ * @param task the task to execute
+ * @param delayInMilis the delay in milliseconds
+ */
+ public void reschedule(Runnable task, long delayInMilis) {
+ executor.schedule(task, delayInMilis, TimeUnit.MILLISECONDS);
+ }
+}
diff --git
a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index bed25a1..b010e0d 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -115,6 +115,9 @@ public class QueueManager implements Runnable,
EventHandler, ConfigurationChange
@Reference(policyOption = ReferencePolicyOption.GREEDY)
private ThreadPoolManager threadPoolManager;
+ @Reference(policyOption = ReferencePolicyOption.GREEDY)
+ private JobReschedulingManager jobReschedulingManager;
+
/**
* Our thread pool.
*/
@@ -164,6 +167,7 @@ public class QueueManager implements Runnable,
EventHandler, ConfigurationChange
queueServices.threadPoolManager = this.threadPoolManager;
queueServices.statisticsManager = statisticsManager;
queueServices.eventingThreadPool = this.threadPool;
+ queueServices.reschedulingManager = this.jobReschedulingManager;
this.configuration.addListener(this);
logger.info("Apache Sling Queue Manager started on instance {}",
Environment.APPLICATION_ID);
}
diff --git
a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
index 31beab2..4250b49 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
@@ -43,4 +43,6 @@ public class QueueServices {
public StatisticsManager statisticsManager;
public ThreadPool eventingThreadPool;
+
+ public JobReschedulingManager reschedulingManager;
}
diff --git
a/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
index 132118a..d9c83f0 100644
---
a/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
+++
b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
@@ -27,6 +27,7 @@ import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
@@ -71,6 +72,13 @@ public class JobQueueImplTest {
jobQueue = new JobQueueImpl(testQueue, internalConfig, services,
cache, null);
}
+ @After
+ public void tearDown() {
+ if (jobQueue != null) {
+ jobQueue.close();
+ }
+ }
+
@Test
public void testStartJobsWhenDisabled() {
// Add a job handler to the queue