guyinyou commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1100880726
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,9 +2595,77 @@ public long getJoinTime() {
}
}
+ class BatchDispatchRequest {
+
+ private ByteBuffer byteBuffer;
+
+ private int position;
+
+ private int size;
+
+ private long id;
+
+ public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int
size, long id) {
+ this.byteBuffer = byteBuffer;
+ this.position = position;
+ this.size = size;
+ this.id = id;
+ }
+ }
+
+ class DispatchRequestOrderlyQueue {
+
+ DispatchRequest[][] buffer;
+
+ long ptr = 0;
+
+ AtomicLong maxPtr = new AtomicLong();
+
+ public DispatchRequestOrderlyQueue(int bufferNum) {
+ this.buffer = new DispatchRequest[bufferNum][];
+ }
+
+ public void put(long idx, DispatchRequest[] obj) {
+ while (ptr + this.buffer.length <= idx) {
+ synchronized (this) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ int mod = (int) (idx % this.buffer.length);
+ this.buffer[mod] = obj;
+ maxPtr.incrementAndGet();
+ }
+
+ public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+ synchronized (this) {
+ for (int i = 0; i < this.buffer.length; i++) {
+ int mod = (int) (ptr % this.buffer.length);
+ DispatchRequest[] ret = this.buffer[mod];
+ if (ret == null) {
+ this.notifyAll();
+ return null;
+ }
+ rets.add(ret);
+ this.buffer[mod] = null;
+ ptr++;
+ }
+ }
+ return null;
+ }
+
+ public synchronized boolean isEmpty() {
+ return maxPtr.get() == ptr;
+ }
+
+ }
+
class ReputMessageService extends ServiceThread {
- private volatile long reputFromOffset = 0;
+ public volatile long reputFromOffset = 0;
public long getReputFromOffset() {
Review Comment:
"protected" can be used for extends, no need "public"
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2743,6 +2851,264 @@ public String getServiceName() {
}
+ class MainBatchDispatchRequestService extends ServiceThread {
+
+ private final ExecutorService batchDispatchRequestExecutor;
+
+ public MainBatchDispatchRequestService() {
+ batchDispatchRequestExecutor = new ThreadPoolExecutor(
+
DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+
DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MICROSECONDS,
+ new LinkedBlockingQueue<>(4096),
+ new
ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+ new ThreadPoolExecutor.AbortPolicy());
+ }
+
+ private void pollBatchDispatchRequest() {
+ try {
+ if (!batchDispatchRequestQueue.isEmpty()) {
+ BatchDispatchRequest task =
batchDispatchRequestQueue.peek();
+ batchDispatchRequestExecutor.execute(() -> {
+ ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+ tmpByteBuffer.position(task.position);
+ tmpByteBuffer.limit(task.position + task.size);
+ List<DispatchRequest> dispatchRequestList = new
ArrayList<>();
+ while (tmpByteBuffer.hasRemaining()) {
+ DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer,
false, false, false);
+ if (dispatchRequest.isSuccess()) {
+ dispatchRequestList.add(dispatchRequest);
+ } else {
+ LOGGER.error("[BUG]read total count not equals
msg total size.");
+ }
+ }
+ dispatchRequestOrderlyQueue.put(task.id,
dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+ mappedPageHoldCount.getAndDecrement();
+ });
+ batchDispatchRequestQueue.poll();
+ }
+ } catch (Exception e) {
+ LOGGER.warn(e.getMessage());
Review Comment:
Maybe the following example is right
`
batchDispatchRequestExecutor.execute(() -> {
try{xxxxxx}cache{}
})
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]