Repository: incubator-rocketmq
Updated Branches:
  refs/heads/master ac8941b03 -> 155823ef7


[ROCKETMQ-145][HOTFIX] Resolve concureent issue in HAService and 
GroupCommitService


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/155823ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/155823ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/155823ef

Branch: refs/heads/master
Commit: 155823ef75314fba328200bd0a1a7aec9f2003d2
Parents: ac8941b
Author: yukon <[email protected]>
Authored: Sat Mar 18 15:34:39 2017 +0800
Committer: yukon <[email protected]>
Committed: Sat Mar 18 15:38:58 2017 +0800

----------------------------------------------------------------------
 .../org/apache/rocketmq/store/CommitLog.java    | 54 ++++++++++----------
 .../org/apache/rocketmq/store/ha/HAService.java | 36 +++++++------
 2 files changed, 47 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/155823ef/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index b4bf298..3bbe675 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -995,12 +995,12 @@ public class CommitLog {
         private volatile List<GroupCommitRequest> requestsWrite = new 
ArrayList<GroupCommitRequest>();
         private volatile List<GroupCommitRequest> requestsRead = new 
ArrayList<GroupCommitRequest>();
 
-        public void putRequest(final GroupCommitRequest request) {
-            synchronized (this) {
+        public synchronized void putRequest(final GroupCommitRequest request) {
+            synchronized (this.requestsWrite) {
                 this.requestsWrite.add(request);
-                if (hasNotified.compareAndSet(false, true)) {
-                    waitPoint.countDown(); // notify
-                }
+            }
+            if (hasNotified.compareAndSet(false, true)) {
+                waitPoint.countDown(); // notify
             }
         }
 
@@ -1011,32 +1011,34 @@ public class CommitLog {
         }
 
         private void doCommit() {
-            if (!this.requestsRead.isEmpty()) {
-                for (GroupCommitRequest req : this.requestsRead) {
-                    // There may be a message in the next file, so a maximum of
-                    // two times the flush
-                    boolean flushOK = false;
-                    for (int i = 0; i < 2 && !flushOK; i++) {
-                        flushOK = 
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
-
-                        if (!flushOK) {
-                            CommitLog.this.mappedFileQueue.flush(0);
+            synchronized (this.requestsRead) {
+                if (!this.requestsRead.isEmpty()) {
+                    for (GroupCommitRequest req : this.requestsRead) {
+                        // There may be a message in the next file, so a 
maximum of
+                        // two times the flush
+                        boolean flushOK = false;
+                        for (int i = 0; i < 2 && !flushOK; i++) {
+                            flushOK = 
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
+
+                            if (!flushOK) {
+                                CommitLog.this.mappedFileQueue.flush(0);
+                            }
                         }
+
+                        req.wakeupCustomer(flushOK);
                     }
 
-                    req.wakeupCustomer(flushOK);
-                }
+                    long storeTimestamp = 
CommitLog.this.mappedFileQueue.getStoreTimestamp();
+                    if (storeTimestamp > 0) {
+                        
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
+                    }
 
-                long storeTimestamp = 
CommitLog.this.mappedFileQueue.getStoreTimestamp();
-                if (storeTimestamp > 0) {
-                    
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
+                    this.requestsRead.clear();
+                } else {
+                    // Because of individual messages is set to not sync 
flush, it
+                    // will come to this process
+                    CommitLog.this.mappedFileQueue.flush(0);
                 }
-
-                this.requestsRead.clear();
-            } else {
-                // Because of individual messages is set to not sync flush, it
-                // will come to this process
-                CommitLog.this.mappedFileQueue.flush(0);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/155823ef/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 762bdb6..f507b36 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -253,12 +253,12 @@ public class HAService {
         private volatile List<CommitLog.GroupCommitRequest> requestsWrite = 
new ArrayList<>();
         private volatile List<CommitLog.GroupCommitRequest> requestsRead = new 
ArrayList<>();
 
-        public void putRequest(final CommitLog.GroupCommitRequest request) {
-            synchronized (this) {
+        public synchronized void putRequest(final CommitLog.GroupCommitRequest 
request) {
+            synchronized (this.requestsWrite) {
                 this.requestsWrite.add(request);
-                if (hasNotified.compareAndSet(false, true)) {
-                    waitPoint.countDown(); // notify
-                }
+            }
+            if (hasNotified.compareAndSet(false, true)) {
+                waitPoint.countDown(); // notify
             }
         }
 
@@ -273,22 +273,24 @@ public class HAService {
         }
 
         private void doWaitTransfer() {
-            if (!this.requestsRead.isEmpty()) {
-                for (CommitLog.GroupCommitRequest req : this.requestsRead) {
-                    boolean transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
-                    for (int i = 0; !transferOK && i < 5; i++) {
-                        this.notifyTransferObject.waitForRunning(1000);
-                        transferOK = HAService.this.push2SlaveMaxOffset.get() 
>= req.getNextOffset();
-                    }
+            synchronized (this.requestsRead) {
+                if (!this.requestsRead.isEmpty()) {
+                    for (CommitLog.GroupCommitRequest req : this.requestsRead) 
{
+                        boolean transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+                        for (int i = 0; !transferOK && i < 5; i++) {
+                            this.notifyTransferObject.waitForRunning(1000);
+                            transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+                        }
 
-                    if (!transferOK) {
-                        log.warn("transfer messsage to slave timeout, " + 
req.getNextOffset());
+                        if (!transferOK) {
+                            log.warn("transfer messsage to slave timeout, " + 
req.getNextOffset());
+                        }
+
+                        req.wakeupCustomer(transferOK);
                     }
 
-                    req.wakeupCustomer(transferOK);
+                    this.requestsRead.clear();
                 }
-
-                this.requestsRead.clear();
             }
         }
 

Reply via email to