wang-jiahua commented on code in PR #10526:
URL: https://github.com/apache/rocketmq/pull/10526#discussion_r3429487828


##########
store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java:
##########
@@ -519,6 +530,26 @@ public boolean appendMessageUsingFileChannel(byte[] data) {
         return false;
     }
 
+    @Override
+    public boolean appendMessageUsingFileChannel(ByteBuffer data) {
+        int currentPos = WROTE_POSITION_UPDATER.get(this);
+        int len = data.remaining();
+
+        if ((currentPos + len) <= this.fileSize) {
+            try {
+                this.fileChannel.position(currentPos);
+                this.fileChannel.write(data);
+                WROTE_POSITION_UPDATER.addAndGet(this, len);
+                return true;
+            } catch (Throwable e) {
+                log.error("Error occurred when append message to mappedFile.", 
e);
+                return false;
+            }
+        }
+
+        return false;
+    }

Review Comment:
   Valid concern. In practice, `FileChannel.write()` to a local file with small 
buffers writes all bytes in one call. However, to be safe, I'll add a write 
loop to handle partial writes. Will fix in next push.



##########
store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java:
##########
@@ -167,8 +177,25 @@ public void setMsgCount4Commercial(int 
msgCount4Commercial) {
         this.msgCount4Commercial = msgCount4Commercial;
     }
 
+    public void addQueueOffset(long offset) {
+        if (queueOffsetSize == messageQueueOffset.length) {
+            long[] newArr = new long[queueOffsetSize + (queueOffsetSize >> 1) 
+ 1];
+            System.arraycopy(messageQueueOffset, 0, newArr, 0, 
queueOffsetSize);
+            messageQueueOffset = newArr;
+        }
+        messageQueueOffset[queueOffsetSize++] = offset;
+    }
+
     public List<Long> getMessageQueueOffset() {
-        return messageQueueOffset;
+        final long[] arr = this.messageQueueOffset;
+        final int size = this.queueOffsetSize;
+        return new AbstractList<Long>() {
+            @Override public Long get(int index) {
+                if (index < 0 || index >= size) throw new 
IndexOutOfBoundsException();
+                return arr[index];
+            }
+            @Override public int size() { return size; }
+        };
     }

Review Comment:
   Intentional change. The old `getMessageQueueOffset()` returned a mutable 
`List<Long>` which callers abused by calling `.add()` directly. The new API 
returns a read-only view and callers must use `addQueueOffset(long)` instead. 
All call sites have been updated (including the 3 test files in this PR).



##########
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java:
##########
@@ -852,7 +852,8 @@ private boolean putMessagePositionInfo(final long offset, 
final int size, final
             this.setMaxPhysicOffset(offset + size);
             boolean appendResult;
             if 
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
-                appendResult = 
mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array());
+                this.byteBufferIndex.flip();
+                appendResult = 
mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex);

Review Comment:
   `byteBufferIndex` is a thread-local field used only within 
`appendMessageUsingFileChannel`, which is called from the dispatch thread 
(single-threaded). The `flip()` call is safe in this context. No concurrent 
access to this buffer.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -667,23 +667,7 @@ public CompletableFuture<PutMessageResult> 
asyncPutMessage(MessageExtBrokerInner
             }
         }
 
-        long beginTime = this.getSystemClock().now();
-        CompletableFuture<PutMessageResult> putResultFuture = 
this.commitLog.asyncPutMessage(msg);
-
-        putResultFuture.thenAccept(result -> {
-            long elapsedTime = this.getSystemClock().now() - beginTime;
-            if (elapsedTime > 500) {
-                LOGGER.warn("DefaultMessageStore#putMessage: 
CommitLog#putMessage cost {}ms, topic={}, bodyLength={}",
-                    elapsedTime, msg.getTopic(), msg.getBody().length);
-            }
-            this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
-
-            if (null == result || !result.isOk()) {
-                this.storeStatsService.getPutMessageFailedTimes().add(1);
-            }
-        });
-
-        return putResultFuture;
+        return this.commitLog.asyncPutMessage(msg);

Review Comment:
   Good catch. The stats recording (`setPutMessageEntireTimeMax`, slow put 
logging, `putMessageFailedTimes`) was moved into `CommitLog.asyncPutMessage` 
itself in the full optimization branch. However, since this PR only includes 
the `DefaultMessageStore` change without the `CommitLog` changes (those are in 
S1), the stats are indeed lost in this PR alone. I'll add the stats recording 
back into `DefaultMessageStore` to avoid regression. Will fix in next push.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to