nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1071814465
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
}
}
+ private void createBatchDispatchRequest(ByteBuffer byteBuffer, int
position, int size) {
+ if (position < 0) {
+ return;
+ }
+ mappedPageHoldCount.getAndIncrement();
+ BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer,
position, size, batchId++);
+ batchDispatchRequestQueue.offer(task);
+ }
+
+ private void doReputConcurrently() {
+ if (this.reputFromOffset <
DefaultMessageStore.this.commitLog.getMinOffset()) {
+ LOGGER.warn("The reputFromOffset={} is smaller than
minPyOffset={}, this usually indicate that the dispatch behind too much and the
commitlog has expired.",
+ this.reputFromOffset,
DefaultMessageStore.this.commitLog.getMinOffset());
+ this.reputFromOffset =
DefaultMessageStore.this.commitLog.getMinOffset();
+ }
+ for (boolean doNext = true; this.isCommitLogAvailable() && doNext;
) {
+
+ SelectMappedBufferResult result =
DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+ if (result == null) {
+ break;
+ }
+
+ int batchDispatchRequestStart = -1;
+ int batchDispatchRequestSize = -1;
+ try {
+ this.reputFromOffset = result.getStartOffset();
+
+ for (int readSize = 0; readSize < result.getSize() &&
reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+ ByteBuffer byteBuffer = result.getByteBuffer();
+
+ int totalSize = byteBuffer.getInt();
+ if (reputFromOffset + totalSize >
DefaultMessageStore.this.getConfirmOffset()) {
+ doNext = false;
+ break;
+ }
+
+ int magicCode = byteBuffer.getInt();
+ switch (magicCode) {
+ case CommitLog.MESSAGE_MAGIC_CODE:
+ break;
+ case CommitLog.BLANK_MAGIC_CODE:
+ totalSize = 0;
+ break;
+ default:
+ totalSize = -1;
+ doNext = false;
+ }
+
+ if (totalSize > 0) {
+ if (batchDispatchRequestStart == -1) {
+ batchDispatchRequestStart =
byteBuffer.position() - 8;
+ batchDispatchRequestSize = 0;
+ }
+ batchDispatchRequestSize += totalSize;
+ if (batchDispatchRequestSize > BATCH_SIZE) {
+ this.createBatchDispatchRequest(byteBuffer,
batchDispatchRequestStart, batchDispatchRequestSize);
+ batchDispatchRequestStart = -1;
+ batchDispatchRequestSize = -1;
+ }
+ byteBuffer.position(byteBuffer.position() +
totalSize - 8);
+ this.reputFromOffset += totalSize;
+ readSize += totalSize;
+ } else {
+ if (totalSize == 0) {
+ this.reputFromOffset =
DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+ readSize = result.getSize();
+ }
+ this.createBatchDispatchRequest(byteBuffer,
batchDispatchRequestStart, batchDispatchRequestSize);
+ batchDispatchRequestStart = -1;
+ batchDispatchRequestSize = -1;
+ }
+ }
+ } catch (Throwable e) {
+ throw e;
Review Comment:
> This line seems redundant.
Yes, it has been deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]