nowinkeyy commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1072468992
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
}
}
+ private void createBatchDispatchRequest(ByteBuffer byteBuffer, int
position, int size) {
+ if (position < 0) {
+ return;
+ }
+ mappedPageHoldCount.getAndIncrement();
+ BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer,
position, size, batchId++);
+ batchDispatchRequestQueue.offer(task);
+ }
+
+ private void doReputConcurrently() {
+ if (this.reputFromOffset <
DefaultMessageStore.this.commitLog.getMinOffset()) {
+ LOGGER.warn("The reputFromOffset={} is smaller than
minPyOffset={}, this usually indicate that the dispatch behind too much and the
commitlog has expired.",
+ this.reputFromOffset,
DefaultMessageStore.this.commitLog.getMinOffset());
+ this.reputFromOffset =
DefaultMessageStore.this.commitLog.getMinOffset();
+ }
+ for (boolean doNext = true; this.isCommitLogAvailable() && doNext;
) {
+
+ SelectMappedBufferResult result =
DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+ if (result == null) {
+ break;
+ }
+
+ int batchDispatchRequestStart = -1;
+ int batchDispatchRequestSize = -1;
+ try {
+ this.reputFromOffset = result.getStartOffset();
+
+ for (int readSize = 0; readSize < result.getSize() &&
reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+ ByteBuffer byteBuffer = result.getByteBuffer();
+
+ int totalSize = byteBuffer.getInt();
+ if (reputFromOffset + totalSize >
DefaultMessageStore.this.getConfirmOffset()) {
+ doNext = false;
+ break;
+ }
+
+ int magicCode = byteBuffer.getInt();
+ switch (magicCode) {
+ case CommitLog.MESSAGE_MAGIC_CODE:
+ break;
+ case CommitLog.BLANK_MAGIC_CODE:
+ totalSize = 0;
+ break;
+ default:
+ totalSize = -1;
+ doNext = false;
+ }
+
+ if (totalSize > 0) {
+ if (batchDispatchRequestStart == -1) {
+ batchDispatchRequestStart =
byteBuffer.position() - 8;
Review Comment:
> using byteBuffer.mark(),byteBuffer.reset() may be more code readability
mark() and reset() have been used.(byteBuffer.mark(); byteBuffer.getInt();
byteBuffer.reset();) May I ask which part of the code comments are needed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]