This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d65778f  [ISSUE #2883] [Part B] Improve produce performance in M/S 
mode.  (#2885)
d65778f is described below

commit d65778f3a83b2daf49cecf270a6bac166ad6f9b8
Author: huangli <[email protected]>
AuthorDate: Tue Jul 6 21:46:14 2021 +0800

    [ISSUE #2883] [Part B] Improve produce performance in M/S mode.  (#2885)
    
    * Optimise lock in WaitNotifyObject
    
    * Remove lock in HAService
    
    * Remove lock in GroupCommitService
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 63 +++++++++++----------
 .../org/apache/rocketmq/store/ha/HAService.java    | 53 ++++++++++--------
 .../apache/rocketmq/store/ha/WaitNotifyObject.java | 64 ++++++++++++----------
 3 files changed, 99 insertions(+), 81 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index cce6481..43b01f0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.store;
 import java.net.Inet6Address;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -1403,48 +1403,55 @@ public class CommitLog {
      * GroupCommit Service
      */
     class GroupCommitService extends FlushCommitLogService {
-        private volatile List<GroupCommitRequest> requestsWrite = new 
ArrayList<GroupCommitRequest>();
-        private volatile List<GroupCommitRequest> requestsRead = new 
ArrayList<GroupCommitRequest>();
+        private volatile LinkedList<GroupCommitRequest> requestsWrite = new 
LinkedList<GroupCommitRequest>();
+        private volatile LinkedList<GroupCommitRequest> requestsRead = new 
LinkedList<GroupCommitRequest>();
+        private final PutMessageSpinLock lock = new PutMessageSpinLock();
 
         public synchronized void putRequest(final GroupCommitRequest request) {
-            synchronized (this.requestsWrite) {
+            lock.lock();
+            try {
                 this.requestsWrite.add(request);
+            } finally {
+                lock.unlock();
             }
             this.wakeup();
         }
 
         private void swapRequests() {
-            List<GroupCommitRequest> tmp = this.requestsWrite;
-            this.requestsWrite = this.requestsRead;
-            this.requestsRead = tmp;
+            lock.lock();
+            try {
+                LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
+                this.requestsWrite = this.requestsRead;
+                this.requestsRead = tmp;
+            } finally {
+                lock.unlock();
+            }
         }
 
         private void doCommit() {
-            synchronized (this.requestsRead) {
-                if (!this.requestsRead.isEmpty()) {
-                    for (GroupCommitRequest req : this.requestsRead) {
-                        // There may be a message in the next file, so a 
maximum of
-                        // two times the flush
-                        boolean flushOK = 
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
-                        for (int i = 0; i < 2 && !flushOK; i++) {
-                            CommitLog.this.mappedFileQueue.flush(0);
-                            flushOK = 
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
-                        }
-
-                        req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : 
PutMessageStatus.FLUSH_DISK_TIMEOUT);
+            if (!this.requestsRead.isEmpty()) {
+                for (GroupCommitRequest req : this.requestsRead) {
+                    // There may be a message in the next file, so a maximum of
+                    // two times the flush
+                    boolean flushOK = 
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
+                    for (int i = 0; i < 2 && !flushOK; i++) {
+                        CommitLog.this.mappedFileQueue.flush(0);
+                        flushOK = 
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                     }
 
-                    long storeTimestamp = 
CommitLog.this.mappedFileQueue.getStoreTimestamp();
-                    if (storeTimestamp > 0) {
-                        
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
-                    }
+                    req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : 
PutMessageStatus.FLUSH_DISK_TIMEOUT);
+                }
 
-                    this.requestsRead.clear();
-                } else {
-                    // Because of individual messages is set to not sync 
flush, it
-                    // will come to this process
-                    CommitLog.this.mappedFileQueue.flush(0);
+                long storeTimestamp = 
CommitLog.this.mappedFileQueue.getStoreTimestamp();
+                if (storeTimestamp > 0) {
+                    
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                 }
+
+                this.requestsRead = new LinkedList<>();
+            } else {
+                // Because of individual messages is set to not sync flush, it
+                // will come to this process
+                CommitLog.this.mappedFileQueue.flush(0);
             }
         }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 34c51eb..d4d4109 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -39,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.store.CommitLog;
 import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.PutMessageSpinLock;
 import org.apache.rocketmq.store.PutMessageStatus;
 
 public class HAService {
@@ -254,12 +254,16 @@ public class HAService {
     class GroupTransferService extends ServiceThread {
 
         private final WaitNotifyObject notifyTransferObject = new 
WaitNotifyObject();
-        private volatile List<CommitLog.GroupCommitRequest> requestsWrite = 
new ArrayList<>();
-        private volatile List<CommitLog.GroupCommitRequest> requestsRead = new 
ArrayList<>();
+        private final PutMessageSpinLock lock = new PutMessageSpinLock();
+        private volatile LinkedList<CommitLog.GroupCommitRequest> 
requestsWrite = new LinkedList<>();
+        private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead 
= new LinkedList<>();
 
-        public synchronized void putRequest(final CommitLog.GroupCommitRequest 
request) {
-            synchronized (this.requestsWrite) {
+        public void putRequest(final CommitLog.GroupCommitRequest request) {
+            lock.lock();
+            try {
                 this.requestsWrite.add(request);
+            } finally {
+                lock.unlock();
             }
             this.wakeup();
         }
@@ -269,32 +273,35 @@ public class HAService {
         }
 
         private void swapRequests() {
-            List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
-            this.requestsWrite = this.requestsRead;
-            this.requestsRead = tmp;
+            lock.lock();
+            try {
+                LinkedList<CommitLog.GroupCommitRequest> tmp = 
this.requestsWrite;
+                this.requestsWrite = this.requestsRead;
+                this.requestsRead = tmp;
+            } finally {
+                lock.unlock();
+            }
         }
 
         private void doWaitTransfer() {
-            synchronized (this.requestsRead) {
-                if (!this.requestsRead.isEmpty()) {
-                    for (CommitLog.GroupCommitRequest req : this.requestsRead) 
{
-                        boolean transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
-                        long waitUntilWhen = 
HAService.this.defaultMessageStore.getSystemClock().now()
+            if (!this.requestsRead.isEmpty()) {
+                for (CommitLog.GroupCommitRequest req : this.requestsRead) {
+                    boolean transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+                    long waitUntilWhen = 
HAService.this.defaultMessageStore.getSystemClock().now()
                             + 
HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
-                        while (!transferOK && 
HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
-                            this.notifyTransferObject.waitForRunning(1000);
-                            transferOK = 
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
-                        }
-
-                        if (!transferOK) {
-                            log.warn("transfer messsage to slave timeout, " + 
req.getNextOffset());
-                        }
+                    while (!transferOK && 
HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
+                        this.notifyTransferObject.waitForRunning(1000);
+                        transferOK = HAService.this.push2SlaveMaxOffset.get() 
>= req.getNextOffset();
+                    }
 
-                        req.wakeupCustomer(transferOK ? 
PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+                    if (!transferOK) {
+                        log.warn("transfer messsage to slave timeout, " + 
req.getNextOffset());
                     }
 
-                    this.requestsRead.clear();
+                    req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                 }
+
+                this.requestsRead = new LinkedList<>();
             }
         }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
index 75b7597..d5ed65f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
@@ -20,40 +20,43 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class WaitNotifyObject {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
-    protected final HashMap<Long/* thread id */, Boolean/* notified */> 
waitingThreadTable =
-        new HashMap<Long, Boolean>(16);
+    protected final ConcurrentHashMap<Long/* thread id */, AtomicBoolean/* 
notified */> waitingThreadTable =
+        new ConcurrentHashMap<Long, AtomicBoolean>(16);
 
-    protected volatile boolean hasNotified = false;
+    protected AtomicBoolean hasNotified = new AtomicBoolean(false);
 
     public void wakeup() {
-        synchronized (this) {
-            if (!this.hasNotified) {
-                this.hasNotified = true;
+        boolean needNotify = hasNotified.compareAndSet(false, true);
+        if (needNotify) {
+            synchronized (this) {
                 this.notify();
             }
         }
     }
 
     protected void waitForRunning(long interval) {
+        if (this.hasNotified.compareAndSet(true, false)) {
+            this.onWaitEnd();
+            return;
+        }
         synchronized (this) {
-            if (this.hasNotified) {
-                this.hasNotified = false;
-                this.onWaitEnd();
-                return;
-            }
-
             try {
+                if (this.hasNotified.compareAndSet(true, false)) {
+                    this.onWaitEnd();
+                    return;
+                }
                 this.wait(interval);
             } catch (InterruptedException e) {
                 log.error("Interrupted", e);
             } finally {
-                this.hasNotified = false;
+                this.hasNotified.set(false);
                 this.onWaitEnd();
             }
         }
@@ -63,15 +66,14 @@ public class WaitNotifyObject {
     }
 
     public void wakeupAll() {
-        synchronized (this) {
-            boolean needNotify = false;
-
-            for (Map.Entry<Long,Boolean> entry : 
this.waitingThreadTable.entrySet()) {
-                needNotify = needNotify || !entry.getValue();
-                entry.setValue(true);
+        boolean needNotify = false;
+        for (Map.Entry<Long,AtomicBoolean> entry : 
this.waitingThreadTable.entrySet()) {
+            if (entry.getValue().compareAndSet(false, true)) {
+                needNotify = true;
             }
-
-            if (needNotify) {
+        }
+        if (needNotify) {
+            synchronized (this) {
                 this.notifyAll();
             }
         }
@@ -79,20 +81,22 @@ public class WaitNotifyObject {
 
     public void allWaitForRunning(long interval) {
         long currentThreadId = Thread.currentThread().getId();
+        AtomicBoolean notified = 
this.waitingThreadTable.computeIfAbsent(currentThreadId, k -> new 
AtomicBoolean(false));
+        if (notified.compareAndSet(true, false)) {
+            this.onWaitEnd();
+            return;
+        }
         synchronized (this) {
-            Boolean notified = this.waitingThreadTable.get(currentThreadId);
-            if (notified != null && notified) {
-                this.waitingThreadTable.put(currentThreadId, false);
-                this.onWaitEnd();
-                return;
-            }
-
             try {
+                if (notified.compareAndSet(true, false)) {
+                    this.onWaitEnd();
+                    return;
+                }
                 this.wait(interval);
             } catch (InterruptedException e) {
                 log.error("Interrupted", e);
             } finally {
-                this.waitingThreadTable.put(currentThreadId, false);
+                notified.set(false);
                 this.onWaitEnd();
             }
         }

Reply via email to