This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new f55e534422 AMQ-9808: Dead lock when using CRON schedule with 
AMQ_SCHEDULED_REPEAT (#1536)
f55e534422 is described below

commit f55e5344226019badea57e8353819882a28d0bd5
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Dec 12 10:00:24 2025 +0100

    AMQ-9808: Dead lock when using CRON schedule with AMQ_SCHEDULED_REPEAT 
(#1536)
    
    * AMQ-9808: Dead lock when using CRON schedule with AMQ_SCHEDULED_REPEAT
    
    * AMQ-9808: Consistency and better exception handling
---
 .../store/kahadb/scheduler/JobSchedulerImpl.java   | 34 ++++++++++++++++++----
 .../broker/scheduler/JmsSchedulerTest.java         | 22 ++++++++------
 2 files changed, 41 insertions(+), 15 deletions(-)

diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 29e9e23ee8..25d086e343 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -268,6 +268,16 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
         this.store.store(update);
     }
 
+    private void doSchedule(final List<Closure> toSchedule) {
+        for (Closure closure : toSchedule) {
+            try {
+                closure.run();
+            } catch (final Exception e) {
+                LOG.warn("Failed to schedule job", e);
+            }
+        }
+    }
+
     private void doRemove(final List<Closure> toRemove) throws IOException {
         for (Closure closure : toRemove) {
             closure.run();
@@ -727,6 +737,7 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
                 // needed before firing the job event.
                 List<Closure> toRemove = new ArrayList<>();
                 List<Closure> toReschedule = new ArrayList<>();
+                List<Closure> toSchedule = new ArrayList<>();
                 try {
                     this.store.readLockIndex();
 
@@ -776,12 +787,18 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
                                                 // we have a separate schedule 
to run at this time
                                                 // so the cron job is used to 
set of a separate schedule
                                                 // hence we won't fire the 
original cron job to the
-                                                // listeners but we do need to 
start a separate schedule
-                                                String jobId = 
ID_GENERATOR.generateId();
-                                                ByteSequence payload = 
getPayload(job.getLocation());
-                                                schedule(jobId, payload, "", 
job.getDelay(), job.getPeriod(), job.getRepeat());
-                                                waitTime = job.getDelay() != 0 
? job.getDelay() : job.getPeriod();
-                                                
this.scheduleTime.setWaitTime(waitTime);
+                                                // listeners, but we do need 
to start a separate schedule
+                                                toSchedule.add(() -> {
+                                                    try {
+                                                        String jobId = 
ID_GENERATOR.generateId();
+                                                        ByteSequence payload = 
getPayload(job.getLocation());
+                                                        schedule(jobId, 
payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
+                                                    } catch (Exception e) {
+                                                        LOG.warn("Failed to 
schedule cron follow-up job", e);
+                                                    }
+                                                });
+                                                long wait = job.getDelay() != 
0 ? job.getDelay() : job.getPeriod();
+                                                
this.scheduleTime.setWaitTime(wait);
                                             }
                                         } else {
                                             toRemove.add(() -> 
doRemove(executionTime, job.getJobId()));
@@ -797,6 +814,10 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
                 } finally {
                     this.store.readUnlockIndex();
 
+                    // deferred execution of all jobs to be scheduled to avoid 
deadlock with indexLock
+                    doSchedule(toSchedule);
+
+                    // now reschedule all jobs that need rescheduling
                     doReschedule(toReschedule);
 
                     // now remove all jobs that have not been rescheduled,
@@ -805,6 +826,7 @@ public class JobSchedulerImpl extends ServiceSupport 
implements Runnable, JobSch
                 }
 
                 this.scheduleTime.pause();
+
             } catch (Exception ioe) {
                 LOG.error("{} Failed to schedule job", this.name, ioe);
                 try {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
index 773713d7d5..4e1a1ce5c5 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
@@ -16,14 +16,6 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import jakarta.jms.Connection;
 import jakarta.jms.JMSException;
 import jakarta.jms.Message;
@@ -32,7 +24,6 @@ import jakarta.jms.MessageListener;
 import jakarta.jms.MessageProducer;
 import jakarta.jms.Session;
 import jakarta.jms.TextMessage;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ScheduledMessage;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
@@ -48,6 +39,14 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 public class JmsSchedulerTest extends JobSchedulerTestSupport {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JmsSchedulerTest.class);
@@ -230,6 +229,11 @@ public class JmsSchedulerTest extends 
JobSchedulerTestSupport {
                     numberOfDiscardedJobs.incrementAndGet();
                 }
             }
+
+            @Override
+            public boolean isStarted() {
+                return true; // false in DefaultTestAppender so Log4j will 
discard this appender
+            }
         };
 
         registerLogAppender(appender);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to