Repository: incubator-rocketmq Updated Branches: refs/heads/master ac8941b03 -> 155823ef7
[ROCKETMQ-145][HOTFIX] Resolve concureent issue in HAService and GroupCommitService Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/155823ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/155823ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/155823ef Branch: refs/heads/master Commit: 155823ef75314fba328200bd0a1a7aec9f2003d2 Parents: ac8941b Author: yukon <[email protected]> Authored: Sat Mar 18 15:34:39 2017 +0800 Committer: yukon <[email protected]> Committed: Sat Mar 18 15:38:58 2017 +0800 ---------------------------------------------------------------------- .../org/apache/rocketmq/store/CommitLog.java | 54 ++++++++++---------- .../org/apache/rocketmq/store/ha/HAService.java | 36 +++++++------ 2 files changed, 47 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/155823ef/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index b4bf298..3bbe675 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -995,12 +995,12 @@ public class CommitLog { private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); - public void putRequest(final GroupCommitRequest request) { - synchronized (this) { + public synchronized void putRequest(final GroupCommitRequest request) { + synchronized (this.requestsWrite) { this.requestsWrite.add(request); - if (hasNotified.compareAndSet(false, true)) { - waitPoint.countDown(); // notify - } + } + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify } } @@ -1011,32 +1011,34 @@ public class CommitLog { } private void doCommit() { - if (!this.requestsRead.isEmpty()) { - for (GroupCommitRequest req : this.requestsRead) { - // There may be a message in the next file, so a maximum of - // two times the flush - boolean flushOK = false; - for (int i = 0; i < 2 && !flushOK; i++) { - flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); - - if (!flushOK) { - CommitLog.this.mappedFileQueue.flush(0); + synchronized (this.requestsRead) { + if (!this.requestsRead.isEmpty()) { + for (GroupCommitRequest req : this.requestsRead) { + // There may be a message in the next file, so a maximum of + // two times the flush + boolean flushOK = false; + for (int i = 0; i < 2 && !flushOK; i++) { + flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); + + if (!flushOK) { + CommitLog.this.mappedFileQueue.flush(0); + } } + + req.wakeupCustomer(flushOK); } - req.wakeupCustomer(flushOK); - } + long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); + if (storeTimestamp > 0) { + CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); + } - long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); - if (storeTimestamp > 0) { - CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); + this.requestsRead.clear(); + } else { + // Because of individual messages is set to not sync flush, it + // will come to this process + CommitLog.this.mappedFileQueue.flush(0); } - - this.requestsRead.clear(); - } else { - // Because of individual messages is set to not sync flush, it - // will come to this process - CommitLog.this.mappedFileQueue.flush(0); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/155823ef/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index 762bdb6..f507b36 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -253,12 +253,12 @@ public class HAService { private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>(); private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>(); - public void putRequest(final CommitLog.GroupCommitRequest request) { - synchronized (this) { + public synchronized void putRequest(final CommitLog.GroupCommitRequest request) { + synchronized (this.requestsWrite) { this.requestsWrite.add(request); - if (hasNotified.compareAndSet(false, true)) { - waitPoint.countDown(); // notify - } + } + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify } } @@ -273,22 +273,24 @@ public class HAService { } private void doWaitTransfer() { - if (!this.requestsRead.isEmpty()) { - for (CommitLog.GroupCommitRequest req : this.requestsRead) { - boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - for (int i = 0; !transferOK && i < 5; i++) { - this.notifyTransferObject.waitForRunning(1000); - transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - } + synchronized (this.requestsRead) { + if (!this.requestsRead.isEmpty()) { + for (CommitLog.GroupCommitRequest req : this.requestsRead) { + boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + for (int i = 0; !transferOK && i < 5; i++) { + this.notifyTransferObject.waitForRunning(1000); + transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + } - if (!transferOK) { - log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); + if (!transferOK) { + log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); + } + + req.wakeupCustomer(transferOK); } - req.wakeupCustomer(transferOK); + this.requestsRead.clear(); } - - this.requestsRead.clear(); } }
