This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit d211e8664d78713ac3dd5424440b1e119d418b14 Author: nowinkey <[email protected]> AuthorDate: Tue Feb 7 23:35:43 2023 +0800 Change the reject policy to AbortPolicy and add try catch final protect --- .../apache/rocketmq/store/DefaultMessageStore.java | 62 ++++++++++------------ 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index cf1745154..f12a90eea 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -43,7 +43,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.ExecutionException; @@ -2620,9 +2619,9 @@ public class DefaultMessageStore implements MessageStore { private int size; - private int id; + private long id; - public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) { + public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, long id) { this.byteBuffer = byteBuffer; this.position = position; this.size = size; @@ -2642,7 +2641,7 @@ public class DefaultMessageStore implements MessageStore { this.buffer = new DispatchRequest[bufferNum][]; } - public void put(int idx, DispatchRequest[] obj) { + public void put(long idx, DispatchRequest[] obj) { while (ptr + this.buffer.length <= idx) { synchronized (this) { try { @@ -2652,7 +2651,7 @@ public class DefaultMessageStore implements MessageStore { } } } - int mod = idx % this.buffer.length; + int mod = (int) (idx % this.buffer.length); this.buffer[mod] = obj; maxPtr.incrementAndGet(); } @@ -2864,38 +2863,33 @@ public class DefaultMessageStore implements MessageStore { TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(4096), new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"), - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - try { - LOGGER.warn("Task {} is blocking put into the workQueue", r); - executor.getQueue().put(r); - } catch (InterruptedException e) { - LOGGER.error("Task {} failed to put into the workQueue", r); - } - } - }); + new ThreadPoolExecutor.AbortPolicy()); } private void pollBatchDispatchRequest() { - if (!batchDispatchRequestQueue.isEmpty()) { - BatchDispatchRequest task = batchDispatchRequestQueue.poll(); - batchDispatchRequestExecutor.execute(() -> { - ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate(); - tmpByteBuffer.position(task.position); - tmpByteBuffer.limit(task.position + task.size); - List<DispatchRequest> dispatchRequestList = new ArrayList<>(); - while (tmpByteBuffer.hasRemaining()) { - DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false); - if (dispatchRequest.isSuccess()) { - dispatchRequestList.add(dispatchRequest); - } else { - LOGGER.error("[BUG]read total count not equals msg total size."); + try { + if (!batchDispatchRequestQueue.isEmpty()) { + BatchDispatchRequest task = batchDispatchRequestQueue.peek(); + batchDispatchRequestExecutor.execute(() -> { + ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate(); + tmpByteBuffer.position(task.position); + tmpByteBuffer.limit(task.position + task.size); + List<DispatchRequest> dispatchRequestList = new ArrayList<>(); + while (tmpByteBuffer.hasRemaining()) { + DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false); + if (dispatchRequest.isSuccess()) { + dispatchRequestList.add(dispatchRequest); + } else { + LOGGER.error("[BUG]read total count not equals msg total size."); + } } - } - dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()])); - mappedPageHoldCount.getAndDecrement(); - }); + dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()])); + mappedPageHoldCount.getAndDecrement(); + }); + batchDispatchRequestQueue.poll(); + } + } catch (Exception e) { + LOGGER.warn(e.getMessage()); } } @@ -2987,7 +2981,7 @@ public class DefaultMessageStore implements MessageStore { private static final int BATCH_SIZE = 1024 * 1024 * 4; - private int batchId = 0; + private long batchId = 0; public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) { if (position < 0) {
