This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d65778f [ISSUE #2883] [Part B] Improve produce performance in M/S
mode. (#2885)
d65778f is described below
commit d65778f3a83b2daf49cecf270a6bac166ad6f9b8
Author: huangli <[email protected]>
AuthorDate: Tue Jul 6 21:46:14 2021 +0800
[ISSUE #2883] [Part B] Improve produce performance in M/S mode. (#2885)
* Optimise lock in WaitNotifyObject
* Remove lock in HAService
* Remove lock in GroupCommitService
---
.../java/org/apache/rocketmq/store/CommitLog.java | 63 +++++++++++----------
.../org/apache/rocketmq/store/ha/HAService.java | 53 ++++++++++--------
.../apache/rocketmq/store/ha/WaitNotifyObject.java | 64 ++++++++++++----------
3 files changed, 99 insertions(+), 81 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index cce6481..43b01f0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.store;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -1403,48 +1403,55 @@ public class CommitLog {
* GroupCommit Service
*/
class GroupCommitService extends FlushCommitLogService {
- private volatile List<GroupCommitRequest> requestsWrite = new
ArrayList<GroupCommitRequest>();
- private volatile List<GroupCommitRequest> requestsRead = new
ArrayList<GroupCommitRequest>();
+ private volatile LinkedList<GroupCommitRequest> requestsWrite = new
LinkedList<GroupCommitRequest>();
+ private volatile LinkedList<GroupCommitRequest> requestsRead = new
LinkedList<GroupCommitRequest>();
+ private final PutMessageSpinLock lock = new PutMessageSpinLock();
public synchronized void putRequest(final GroupCommitRequest request) {
- synchronized (this.requestsWrite) {
+ lock.lock();
+ try {
this.requestsWrite.add(request);
+ } finally {
+ lock.unlock();
}
this.wakeup();
}
private void swapRequests() {
- List<GroupCommitRequest> tmp = this.requestsWrite;
- this.requestsWrite = this.requestsRead;
- this.requestsRead = tmp;
+ lock.lock();
+ try {
+ LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
+ this.requestsWrite = this.requestsRead;
+ this.requestsRead = tmp;
+ } finally {
+ lock.unlock();
+ }
}
private void doCommit() {
- synchronized (this.requestsRead) {
- if (!this.requestsRead.isEmpty()) {
- for (GroupCommitRequest req : this.requestsRead) {
- // There may be a message in the next file, so a
maximum of
- // two times the flush
- boolean flushOK =
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
- for (int i = 0; i < 2 && !flushOK; i++) {
- CommitLog.this.mappedFileQueue.flush(0);
- flushOK =
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
- }
-
- req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK :
PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ if (!this.requestsRead.isEmpty()) {
+ for (GroupCommitRequest req : this.requestsRead) {
+ // There may be a message in the next file, so a maximum of
+ // two times the flush
+ boolean flushOK =
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
+ for (int i = 0; i < 2 && !flushOK; i++) {
+ CommitLog.this.mappedFileQueue.flush(0);
+ flushOK =
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
- long storeTimestamp =
CommitLog.this.mappedFileQueue.getStoreTimestamp();
- if (storeTimestamp > 0) {
-
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
- }
+ req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK :
PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ }
- this.requestsRead.clear();
- } else {
- // Because of individual messages is set to not sync
flush, it
- // will come to this process
- CommitLog.this.mappedFileQueue.flush(0);
+ long storeTimestamp =
CommitLog.this.mappedFileQueue.getStoreTimestamp();
+ if (storeTimestamp > 0) {
+
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
+
+ this.requestsRead = new LinkedList<>();
+ } else {
+ // Because of individual messages is set to not sync flush, it
+ // will come to this process
+ CommitLog.this.mappedFileQueue.flush(0);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 34c51eb..d4d4109 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -39,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.PutMessageSpinLock;
import org.apache.rocketmq.store.PutMessageStatus;
public class HAService {
@@ -254,12 +254,16 @@ public class HAService {
class GroupTransferService extends ServiceThread {
private final WaitNotifyObject notifyTransferObject = new
WaitNotifyObject();
- private volatile List<CommitLog.GroupCommitRequest> requestsWrite =
new ArrayList<>();
- private volatile List<CommitLog.GroupCommitRequest> requestsRead = new
ArrayList<>();
+ private final PutMessageSpinLock lock = new PutMessageSpinLock();
+ private volatile LinkedList<CommitLog.GroupCommitRequest>
requestsWrite = new LinkedList<>();
+ private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead
= new LinkedList<>();
- public synchronized void putRequest(final CommitLog.GroupCommitRequest
request) {
- synchronized (this.requestsWrite) {
+ public void putRequest(final CommitLog.GroupCommitRequest request) {
+ lock.lock();
+ try {
this.requestsWrite.add(request);
+ } finally {
+ lock.unlock();
}
this.wakeup();
}
@@ -269,32 +273,35 @@ public class HAService {
}
private void swapRequests() {
- List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
- this.requestsWrite = this.requestsRead;
- this.requestsRead = tmp;
+ lock.lock();
+ try {
+ LinkedList<CommitLog.GroupCommitRequest> tmp =
this.requestsWrite;
+ this.requestsWrite = this.requestsRead;
+ this.requestsRead = tmp;
+ } finally {
+ lock.unlock();
+ }
}
private void doWaitTransfer() {
- synchronized (this.requestsRead) {
- if (!this.requestsRead.isEmpty()) {
- for (CommitLog.GroupCommitRequest req : this.requestsRead)
{
- boolean transferOK =
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- long waitUntilWhen =
HAService.this.defaultMessageStore.getSystemClock().now()
+ if (!this.requestsRead.isEmpty()) {
+ for (CommitLog.GroupCommitRequest req : this.requestsRead) {
+ boolean transferOK =
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+ long waitUntilWhen =
HAService.this.defaultMessageStore.getSystemClock().now()
+
HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
- while (!transferOK &&
HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
- this.notifyTransferObject.waitForRunning(1000);
- transferOK =
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- }
-
- if (!transferOK) {
- log.warn("transfer messsage to slave timeout, " +
req.getNextOffset());
- }
+ while (!transferOK &&
HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
+ this.notifyTransferObject.waitForRunning(1000);
+ transferOK = HAService.this.push2SlaveMaxOffset.get()
>= req.getNextOffset();
+ }
- req.wakeupCustomer(transferOK ?
PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+ if (!transferOK) {
+ log.warn("transfer messsage to slave timeout, " +
req.getNextOffset());
}
- this.requestsRead.clear();
+ req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK :
PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
+
+ this.requestsRead = new LinkedList<>();
}
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
index 75b7597..d5ed65f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
@@ -20,40 +20,43 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
public class WaitNotifyObject {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- protected final HashMap<Long/* thread id */, Boolean/* notified */>
waitingThreadTable =
- new HashMap<Long, Boolean>(16);
+ protected final ConcurrentHashMap<Long/* thread id */, AtomicBoolean/*
notified */> waitingThreadTable =
+ new ConcurrentHashMap<Long, AtomicBoolean>(16);
- protected volatile boolean hasNotified = false;
+ protected AtomicBoolean hasNotified = new AtomicBoolean(false);
public void wakeup() {
- synchronized (this) {
- if (!this.hasNotified) {
- this.hasNotified = true;
+ boolean needNotify = hasNotified.compareAndSet(false, true);
+ if (needNotify) {
+ synchronized (this) {
this.notify();
}
}
}
protected void waitForRunning(long interval) {
+ if (this.hasNotified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
synchronized (this) {
- if (this.hasNotified) {
- this.hasNotified = false;
- this.onWaitEnd();
- return;
- }
-
try {
+ if (this.hasNotified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
this.wait(interval);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
- this.hasNotified = false;
+ this.hasNotified.set(false);
this.onWaitEnd();
}
}
@@ -63,15 +66,14 @@ public class WaitNotifyObject {
}
public void wakeupAll() {
- synchronized (this) {
- boolean needNotify = false;
-
- for (Map.Entry<Long,Boolean> entry :
this.waitingThreadTable.entrySet()) {
- needNotify = needNotify || !entry.getValue();
- entry.setValue(true);
+ boolean needNotify = false;
+ for (Map.Entry<Long,AtomicBoolean> entry :
this.waitingThreadTable.entrySet()) {
+ if (entry.getValue().compareAndSet(false, true)) {
+ needNotify = true;
}
-
- if (needNotify) {
+ }
+ if (needNotify) {
+ synchronized (this) {
this.notifyAll();
}
}
@@ -79,20 +81,22 @@ public class WaitNotifyObject {
public void allWaitForRunning(long interval) {
long currentThreadId = Thread.currentThread().getId();
+ AtomicBoolean notified =
this.waitingThreadTable.computeIfAbsent(currentThreadId, k -> new
AtomicBoolean(false));
+ if (notified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
synchronized (this) {
- Boolean notified = this.waitingThreadTable.get(currentThreadId);
- if (notified != null && notified) {
- this.waitingThreadTable.put(currentThreadId, false);
- this.onWaitEnd();
- return;
- }
-
try {
+ if (notified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
this.wait(interval);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
- this.waitingThreadTable.put(currentThreadId, false);
+ notified.set(false);
this.onWaitEnd();
}
}