This is an automated email from the ASF dual-hosted git repository.

joerghoh pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git


The following commit(s) were added to refs/heads/master by this push:
     new 07826be  SLING-13044 reschedule jobs via dedicated threadpool instead 
of timers (#51)
07826be is described below

commit 07826be8e57c99e2fc7d575ecf5a8dbcce136c7e
Author: Jörg Hoh <[email protected]>
AuthorDate: Mon Feb 9 13:40:20 2026 +0100

    SLING-13044 reschedule jobs via dedicated threadpool instead of timers (#51)
---
 .../sling/event/impl/jobs/queues/JobQueueImpl.java | 46 +++++-------
 .../impl/jobs/queues/JobReschedulingManager.java   | 87 ++++++++++++++++++++++
 .../sling/event/impl/jobs/queues/QueueManager.java |  4 +
 .../event/impl/jobs/queues/QueueServices.java      |  2 +
 .../event/impl/jobs/queues/JobQueueImplTest.java   |  8 ++
 5 files changed, 119 insertions(+), 28 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index d727186..09c4a4d 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -23,8 +23,6 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -85,6 +83,8 @@ public class JobQueueImpl implements Queue {
 
     private final ThreadPool threadPool;
 
+    private final JobReschedulingManager jobReschedulingManager;
+
     /** Async counter. */
     private final AtomicInteger asyncCounter = new AtomicInteger();
 
@@ -175,17 +175,19 @@ public class JobQueueImpl implements Queue {
             final QueueServices services,
             final QueueJobCache cache,
             final OutdatedJobQueueInfo outdatedQueue) {
+        this.queueName = name;
+        this.configuration = config;
+
         if (config.getOwnThreadPoolSize() > 0) {
             this.threadPool = new 
EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
         } else {
             this.threadPool = services.eventingThreadPool;
         }
-        this.queueName = name;
-        this.configuration = config;
         this.services = services;
         this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' 
+ name);
         this.running = true;
         this.cache = cache;
+        this.jobReschedulingManager = services.reschedulingManager;
         this.maxParallel = config.getMaxParallel();
         if (outdatedQueue == null) {
             // queue is created the first time
@@ -799,34 +801,22 @@ public class JobQueueImpl implements Queue {
                 this.isSleepingUntil = fireDate.getTime();
             }
 
-            final Runnable t = new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        if (handler.removeFromRetryList()) {
-                            requeue(handler);
-                        }
-                        waitCounter.decrementAndGet();
-                    } finally {
-                        if (configuration.getType() == Type.ORDERED) {
-                            isSleepingUntil = -1;
-                            cache.setIsBlocked(false);
-                            startJobs();
-                        }
+            final Runnable task = () -> {
+                try {
+                    if (handler.removeFromRetryList()) {
+                        requeue(handler);
+                    }
+                    waitCounter.decrementAndGet();
+                } finally {
+                    if (configuration.getType() == Type.ORDERED) {
+                        isSleepingUntil = -1;
+                        cache.setIsBlocked(false);
+                        startJobs();
                     }
                 }
             };
             this.waitCounter.incrementAndGet();
-            final Timer timer = new Timer();
-            timer.schedule(
-                    new TimerTask() {
-
-                        @Override
-                        public void run() {
-                            t.run();
-                        }
-                    },
-                    delay);
+            this.jobReschedulingManager.reschedule(task, delay);
         } else {
             // put directly into queue
             this.requeue(handler);
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobReschedulingManager.java
 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobReschedulingManager.java
new file mode 100644
index 0000000..ebd1b1b
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobReschedulingManager.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A centralized service to manage the delay after which tasks are executed.
+ *
+ * This service is used to reschedule failed jobs, which are submitted to the 
queue again
+ * after the delay has passed. For that reason the threadpool to manage that 
delay is quite small,
+ * as the re-submission of the jobs into their respective queues is fast.
+ */
+@Component(service = JobReschedulingManager.class)
+@Designate(ocd = JobReschedulingManager.Config.class)
+public class JobReschedulingManager {
+
+    @ObjectClassDefinition(name = "Apache Sling Job Rescheduling Manager")
+    public @interface Config {
+
+        @AttributeDefinition(name = "thread pool size", description = "number 
of threads to execute the rescheduling")
+        public int threadCount() default 1;
+    }
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JobReschedulingManager.class);
+
+    private ScheduledThreadPoolExecutor executor;
+
+    @Activate
+    public void activate(Config config) {
+        executor = new ScheduledThreadPoolExecutor(config.threadCount(), new 
ThreadFactory() {
+            private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread t = new Thread(r, "sling-job-rescheduler-" + 
threadNumber.getAndIncrement());
+                t.setDaemon(true);
+                t.setUncaughtExceptionHandler((Thread thread, Throwable e) ->
+                        logger.error("Thread '" + thread.getName() + "' 
terminated unexpectedly", e));
+                return t;
+            }
+        });
+    }
+
+    @Deactivate
+    public void deactivate() {
+        // submitted tasks which are not yet executed will be dropped (which 
is fine here)
+        executor.shutdown();
+    }
+
+    /**
+     * Start the provided task with a delay in a fire and forgot way
+     * @param task the task to execute
+     * @param delayInMilis the delay in milliseconds
+     */
+    public void reschedule(Runnable task, long delayInMilis) {
+        executor.schedule(task, delayInMilis, TimeUnit.MILLISECONDS);
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index bed25a1..b010e0d 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -115,6 +115,9 @@ public class QueueManager implements Runnable, 
EventHandler, ConfigurationChange
     @Reference(policyOption = ReferencePolicyOption.GREEDY)
     private ThreadPoolManager threadPoolManager;
 
+    @Reference(policyOption = ReferencePolicyOption.GREEDY)
+    private JobReschedulingManager jobReschedulingManager;
+
     /**
      * Our thread pool.
      */
@@ -164,6 +167,7 @@ public class QueueManager implements Runnable, 
EventHandler, ConfigurationChange
         queueServices.threadPoolManager = this.threadPoolManager;
         queueServices.statisticsManager = statisticsManager;
         queueServices.eventingThreadPool = this.threadPool;
+        queueServices.reschedulingManager = this.jobReschedulingManager;
         this.configuration.addListener(this);
         logger.info("Apache Sling Queue Manager started on instance {}", 
Environment.APPLICATION_ID);
     }
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
index 31beab2..4250b49 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
@@ -43,4 +43,6 @@ public class QueueServices {
     public StatisticsManager statisticsManager;
 
     public ThreadPool eventingThreadPool;
+
+    public JobReschedulingManager reschedulingManager;
 }
diff --git 
a/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java 
b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
index 132118a..d9c83f0 100644
--- 
a/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
+++ 
b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
@@ -27,6 +27,7 @@ import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InjectMocks;
@@ -71,6 +72,13 @@ public class JobQueueImplTest {
         jobQueue = new JobQueueImpl(testQueue, internalConfig, services, 
cache, null);
     }
 
+    @After
+    public void tearDown() {
+        if (jobQueue != null) {
+            jobQueue.close();
+        }
+    }
+
     @Test
     public void testStartJobsWhenDisabled() {
         // Add a job handler to the queue

Reply via email to