This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new f55e534422 AMQ-9808: Dead lock when using CRON schedule with
AMQ_SCHEDULED_REPEAT (#1536)
f55e534422 is described below
commit f55e5344226019badea57e8353819882a28d0bd5
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Fri Dec 12 10:00:24 2025 +0100
AMQ-9808: Dead lock when using CRON schedule with AMQ_SCHEDULED_REPEAT
(#1536)
* AMQ-9808: Dead lock when using CRON schedule with AMQ_SCHEDULED_REPEAT
* AMQ-9808: Consistency and better exception handling
---
.../store/kahadb/scheduler/JobSchedulerImpl.java | 34 ++++++++++++++++++----
.../broker/scheduler/JmsSchedulerTest.java | 22 ++++++++------
2 files changed, 41 insertions(+), 15 deletions(-)
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 29e9e23ee8..25d086e343 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -268,6 +268,16 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
this.store.store(update);
}
+ private void doSchedule(final List<Closure> toSchedule) {
+ for (Closure closure : toSchedule) {
+ try {
+ closure.run();
+ } catch (final Exception e) {
+ LOG.warn("Failed to schedule job", e);
+ }
+ }
+ }
+
private void doRemove(final List<Closure> toRemove) throws IOException {
for (Closure closure : toRemove) {
closure.run();
@@ -727,6 +737,7 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
// needed before firing the job event.
List<Closure> toRemove = new ArrayList<>();
List<Closure> toReschedule = new ArrayList<>();
+ List<Closure> toSchedule = new ArrayList<>();
try {
this.store.readLockIndex();
@@ -776,12 +787,18 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
// we have a separate schedule
to run at this time
// so the cron job is used to
set of a separate schedule
// hence we won't fire the
original cron job to the
- // listeners but we do need to
start a separate schedule
- String jobId =
ID_GENERATOR.generateId();
- ByteSequence payload =
getPayload(job.getLocation());
- schedule(jobId, payload, "",
job.getDelay(), job.getPeriod(), job.getRepeat());
- waitTime = job.getDelay() != 0
? job.getDelay() : job.getPeriod();
-
this.scheduleTime.setWaitTime(waitTime);
+ // listeners, but we do need
to start a separate schedule
+ toSchedule.add(() -> {
+ try {
+ String jobId =
ID_GENERATOR.generateId();
+ ByteSequence payload =
getPayload(job.getLocation());
+ schedule(jobId,
payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
+ } catch (Exception e) {
+ LOG.warn("Failed to
schedule cron follow-up job", e);
+ }
+ });
+ long wait = job.getDelay() !=
0 ? job.getDelay() : job.getPeriod();
+
this.scheduleTime.setWaitTime(wait);
}
} else {
toRemove.add(() ->
doRemove(executionTime, job.getJobId()));
@@ -797,6 +814,10 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
} finally {
this.store.readUnlockIndex();
+ // deferred execution of all jobs to be scheduled to avoid
deadlock with indexLock
+ doSchedule(toSchedule);
+
+ // now reschedule all jobs that need rescheduling
doReschedule(toReschedule);
// now remove all jobs that have not been rescheduled,
@@ -805,6 +826,7 @@ public class JobSchedulerImpl extends ServiceSupport
implements Runnable, JobSch
}
this.scheduleTime.pause();
+
} catch (Exception ioe) {
LOG.error("{} Failed to schedule job", this.name, ioe);
try {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
index 773713d7d5..4e1a1ce5c5 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
@@ -16,14 +16,6 @@
*/
package org.apache.activemq.broker.scheduler;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
@@ -32,7 +24,6 @@ import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.store.kahadb.disk.journal.Location;
@@ -48,6 +39,14 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
public class JmsSchedulerTest extends JobSchedulerTestSupport {
private static final Logger LOG =
LoggerFactory.getLogger(JmsSchedulerTest.class);
@@ -230,6 +229,11 @@ public class JmsSchedulerTest extends
JobSchedulerTestSupport {
numberOfDiscardedJobs.incrementAndGet();
}
}
+
+ @Override
+ public boolean isStarted() {
+ return true; // false in DefaultTestAppender so Log4j will
discard this appender
+ }
};
registerLogAppender(appender);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact