nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1072411674
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
}
}
+ class BatchDispatchRequest {
+
Review Comment:
> using ServiceThread notify and wait logic
Does it mean countdownlatch2? Specifically,
MainBatchDispatchRequestServiceThread notifications and waits between
ReputMessageService?
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
}
}
+ private void createBatchDispatchRequest(ByteBuffer byteBuffer, int
position, int size) {
+ if (position < 0) {
+ return;
+ }
+ mappedPageHoldCount.getAndIncrement();
+ BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer,
position, size, batchId++);
+ batchDispatchRequestQueue.offer(task);
+ }
+
+ private void doReputConcurrently() {
+ if (this.reputFromOffset <
DefaultMessageStore.this.commitLog.getMinOffset()) {
+ LOGGER.warn("The reputFromOffset={} is smaller than
minPyOffset={}, this usually indicate that the dispatch behind too much and the
commitlog has expired.",
+ this.reputFromOffset,
DefaultMessageStore.this.commitLog.getMinOffset());
+ this.reputFromOffset =
DefaultMessageStore.this.commitLog.getMinOffset();
+ }
+ for (boolean doNext = true; this.isCommitLogAvailable() && doNext;
) {
+
+ SelectMappedBufferResult result =
DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+ if (result == null) {
+ break;
+ }
+
+ int batchDispatchRequestStart = -1;
+ int batchDispatchRequestSize = -1;
+ try {
+ this.reputFromOffset = result.getStartOffset();
+
+ for (int readSize = 0; readSize < result.getSize() &&
reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+ ByteBuffer byteBuffer = result.getByteBuffer();
+
+ int totalSize = byteBuffer.getInt();
+ if (reputFromOffset + totalSize >
DefaultMessageStore.this.getConfirmOffset()) {
+ doNext = false;
+ break;
+ }
+
+ int magicCode = byteBuffer.getInt();
+ switch (magicCode) {
+ case CommitLog.MESSAGE_MAGIC_CODE:
+ break;
+ case CommitLog.BLANK_MAGIC_CODE:
+ totalSize = 0;
+ break;
+ default:
+ totalSize = -1;
+ doNext = false;
+ }
+
+ if (totalSize > 0) {
+ if (batchDispatchRequestStart == -1) {
+ batchDispatchRequestStart =
byteBuffer.position() - 8;
+ batchDispatchRequestSize = 0;
+ }
+ batchDispatchRequestSize += totalSize;
+ if (batchDispatchRequestSize > BATCH_SIZE) {
+ this.createBatchDispatchRequest(byteBuffer,
batchDispatchRequestStart, batchDispatchRequestSize);
+ batchDispatchRequestStart = -1;
+ batchDispatchRequestSize = -1;
+ }
+ byteBuffer.position(byteBuffer.position() +
totalSize - 8);
+ this.reputFromOffset += totalSize;
+ readSize += totalSize;
+ } else {
+ if (totalSize == 0) {
+ this.reputFromOffset =
DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+ readSize = result.getSize();
+ }
+ this.createBatchDispatchRequest(byteBuffer,
batchDispatchRequestStart, batchDispatchRequestSize);
Review Comment:
> duplicate codeļ¼please refactor into a method
Did you say 2933 and 2934?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]