Repository: activemq Updated Branches: refs/heads/master 2e64abc38 -> c8a6171d0
https://issues.apache.org/jira/browse/AMQ-6288 Switching the checkpoint lock to a readlock when forwarding acks to prevent other journal updates from being blocked. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c8a6171d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c8a6171d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c8a6171d Branch: refs/heads/master Commit: c8a6171d04303da18d9e19ded30146643d7cbad6 Parents: 2e64abc Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Thu May 12 14:37:51 2016 +0000 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Thu May 12 14:39:12 2016 +0000 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 28 +++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/c8a6171d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 21530c2..92310a8 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1888,6 +1888,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe int journalToAdvance = -1; Set<Integer> journalLogsReferenced = new HashSet<Integer>(); + //flag to know whether the ack forwarding completed without an exception + boolean forwarded = false; + try { //acquire the checkpoint lock to prevent other threads from //running a checkpoint while this is running @@ -1903,7 +1906,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe //In the future it might be better to just remove the checkpointLock entirely //and only use the executor but this would need to be examined for any unintended //consequences - checkpointLock.writeLock().lock(); + checkpointLock.readLock().lock(); try { @@ -1937,18 +1940,29 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { // Background rewrite of the old acks forwardAllAcks(journalToAdvance, journalLogsReferenced); - - // Checkpoint with changes from the ackMessageFileMap - checkpointUpdate(false); + forwarded = true; } catch (IOException ioe) { - LOG.error("Checkpoint failed", ioe); + LOG.error("Forwarding of acks failed", ioe); brokerService.handleIOException(ioe); } catch (Throwable e) { - LOG.error("Checkpoint failed", e); + LOG.error("Forwarding of acks failed", e); brokerService.handleIOException(IOExceptionSupport.create(e)); } } finally { - checkpointLock.writeLock().unlock(); + checkpointLock.readLock().unlock(); + } + + try { + if (forwarded) { + // Checkpoint with changes from the ackMessageFileMap + checkpointUpdate(false); + } + } catch (IOException ioe) { + LOG.error("Checkpoint failed", ioe); + brokerService.handleIOException(ioe); + } catch (Throwable e) { + LOG.error("Checkpoint failed", e); + brokerService.handleIOException(IOExceptionSupport.create(e)); } } }
