This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 144254a5c444e460377d85a727c3c9074f98c195 Author: nowinkey <[email protected]> AuthorDate: Mon Jan 16 01:00:43 2023 +0800 [ISSUE #5884] Concurrent check CommitLog messages --- .../apache/rocketmq/store/DefaultMessageStore.java | 283 ++++++++++++++++++++- .../rocketmq/store/config/MessageStoreConfig.java | 13 + 2 files changed, 290 insertions(+), 6 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 11898f8cf..a4af44222 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -42,14 +42,19 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.AbstractBrokerRunnable; @@ -2571,10 +2576,181 @@ public class DefaultMessageStore implements MessageStore { } } + class BatchDispatchRequest { + + private ByteBuffer byteBuffer; + + private int position; + + private int size; + + private int id; + + public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) { + this.byteBuffer = byteBuffer; + this.position = position; + this.size = size; + this.id = id; + } + } + + class DispatchRequestOrderlyQueue { + + DispatchRequest[][] buffer; + + int ptr = 0; + + AtomicInteger maxPtr = new AtomicInteger(); + + public DispatchRequestOrderlyQueue(int bufferNum) { + this.buffer = new DispatchRequest[bufferNum][]; + } + + public void put(int idx, DispatchRequest[] obj) { + while (ptr + this.buffer.length <= idx) { + synchronized (this) { + try { + this.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + int mod = idx % this.buffer.length; + this.buffer[mod] = obj; + maxPtr.incrementAndGet(); + } + + public DispatchRequest[] get(List<DispatchRequest[]> rets) { + synchronized (this) { + for (int i = 0; i < this.buffer.length; i++) { + int mod = ptr % this.buffer.length; + DispatchRequest[] ret = this.buffer[mod]; + if (ret == null) { + this.notifyAll(); + return null; + } + rets.add(ret); + this.buffer[mod] = null; + ptr++; + } + } + return null; + } + + public synchronized boolean isEmpty() { + return maxPtr.get() == ptr; + } + + } + class ReputMessageService extends ServiceThread { private volatile long reputFromOffset = 0; + private int batchId = 0; + + private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0); + + private static final int BATCH_SIZE = 1024 * 1024 * 4; + + private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>(); + + private int dispatchRequestOrderlyQueueSize = 16; + + private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize); + + private int batchDispatchRequestThreadPoolNums = 16; + + private ExecutorService batchDispatchRequestExecutor; + + public ReputMessageService() { + if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { + initExecutorService(); + startBatchDispatchRequestService(); + } + } + + private void initExecutorService() { + batchDispatchRequestExecutor = new ThreadPoolExecutor( + this.batchDispatchRequestThreadPoolNums, + this.batchDispatchRequestThreadPoolNums, + 1000 * 60, + TimeUnit.MICROSECONDS, + new LinkedBlockingDeque<>(), + new ThreadFactoryImpl("BatchDispatchRequestServiceThread_")); + } + + private void startBatchDispatchRequestService() { + new Thread(() -> { + while (true) { + if (!batchDispatchRequestQueue.isEmpty()) { + BatchDispatchRequest task = batchDispatchRequestQueue.poll(); + batchDispatchRequestExecutor.execute(() -> { + ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate(); + tmpByteBuffer.position(task.position); + tmpByteBuffer.limit(task.position + task.size); + List<DispatchRequest> dispatchRequestList = new ArrayList<>(); + while (tmpByteBuffer.hasRemaining()) { + DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false); + if (dispatchRequest.isSuccess()) { + dispatchRequestList.add(dispatchRequest); + } else { + LOGGER.error("[BUG]read total count not equals msg total size."); + } + } + this.dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()])); + mappedPageHoldCount.getAndDecrement(); + }); + } else { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + }, "MainBatchDispatchRequestServiceThread").start(); + + new Thread(() -> { + List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>(); + while (true) { + dispatchRequestsList.clear(); + dispatchRequestOrderlyQueue.get(dispatchRequestsList); + if (!dispatchRequestsList.isEmpty()) { + for (DispatchRequest[] dispatchRequests : dispatchRequestsList) { + for (DispatchRequest dispatchRequest : dispatchRequests) { + DefaultMessageStore.this.doDispatch(dispatchRequest); + // wake up long-polling + if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable() + && DefaultMessageStore.this.messageArrivingListener != null) { + DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), + dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, + dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), + dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); + notifyMessageArrive4MultiQueue(dispatchRequest); + } + if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && + DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { + DefaultMessageStore.this.storeStatsService + .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1); + DefaultMessageStore.this.storeStatsService + .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) + .add(dispatchRequest.getMsgSize()); + } + } + } + } else { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + }, "DispatchServiceThread").start(); + } + public long getReputFromOffset() { return reputFromOffset; } @@ -2689,6 +2865,97 @@ public class DefaultMessageStore implements MessageStore { } } + private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) { + if (position < 0) { + return; + } + mappedPageHoldCount.getAndIncrement(); + BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++); + batchDispatchRequestQueue.offer(task); + } + + private void doReputConcurrently() { + if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { + LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", + this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); + this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); + } + for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { + + SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); + + if (result == null) { + break; + } + + int batchDispatchRequestStart = -1; + int batchDispatchRequestSize = -1; + try { + this.reputFromOffset = result.getStartOffset(); + + for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) { + ByteBuffer byteBuffer = result.getByteBuffer(); + + int totalSize = byteBuffer.getInt(); + if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) { + doNext = false; + break; + } + + int magicCode = byteBuffer.getInt(); + switch (magicCode) { + case CommitLog.MESSAGE_MAGIC_CODE: + break; + case CommitLog.BLANK_MAGIC_CODE: + totalSize = 0; + break; + default: + totalSize = -1; + doNext = false; + } + + if (totalSize > 0) { + if (batchDispatchRequestStart == -1) { + batchDispatchRequestStart = byteBuffer.position() - 8; + batchDispatchRequestSize = 0; + } + batchDispatchRequestSize += totalSize; + if (batchDispatchRequestSize > BATCH_SIZE) { + this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize); + batchDispatchRequestStart = -1; + batchDispatchRequestSize = -1; + } + byteBuffer.position(byteBuffer.position() + totalSize - 8); + this.reputFromOffset += totalSize; + readSize += totalSize; + } else { + if (totalSize == 0) { + this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); + readSize = result.getSize(); + } + this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize); + batchDispatchRequestStart = -1; + batchDispatchRequestSize = -1; + } + } + } catch (Throwable e) { + throw e; + } finally { + this.createBatchDispatchRequest(result.getByteBuffer(), batchDispatchRequestStart, batchDispatchRequestSize); + boolean over = this.mappedPageHoldCount.get() == 0; + while (!over) { + try { + Thread.sleep(1); + } catch (Exception e) { + e.printStackTrace(); + } + over = this.mappedPageHoldCount.get() == 0; + } + result.release(); + } + } + } + private void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) { Map<String, String> prop = dispatchRequest.getPropertiesMap(); if (prop == null) { @@ -2724,7 +2991,11 @@ public class DefaultMessageStore implements MessageStore { while (!this.isStopped()) { try { Thread.sleep(1); - this.doReput(); + if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { + this.doReput(); + } else { + doReputConcurrently(); + } } catch (Exception e) { DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e); } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index e29fdc2b0..a55a41df3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -376,6 +376,11 @@ public class MessageStoreConfig { */ private int sampleCountThreshold = 5000; + /** + * Build ConsumeQueue concurrently with multi-thread + */ + private boolean enableBuildConsumeQueueConcurrently = false; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -1600,4 +1605,12 @@ public class MessageStoreConfig { public void setSampleCountThreshold(int sampleCountThreshold) { this.sampleCountThreshold = sampleCountThreshold; } + + public boolean isEnableBuildConsumeQueueConcurrently() { + return enableBuildConsumeQueueConcurrently; + } + + public void setEnableBuildConsumeQueueConcurrently(boolean enableBuildConsumeQueueConcurrently) { + this.enableBuildConsumeQueueConcurrently = enableBuildConsumeQueueConcurrently; + } }
