Copilot commented on code in PR #10526:
URL: https://github.com/apache/rocketmq/pull/10526#discussion_r3429201203


##########
store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java:
##########
@@ -519,6 +530,26 @@ public boolean appendMessageUsingFileChannel(byte[] data) {
         return false;
     }
 
+    @Override
+    public boolean appendMessageUsingFileChannel(ByteBuffer data) {
+        int currentPos = WROTE_POSITION_UPDATER.get(this);
+        int len = data.remaining();
+
+        if ((currentPos + len) <= this.fileSize) {
+            try {
+                this.fileChannel.position(currentPos);
+                this.fileChannel.write(data);
+                WROTE_POSITION_UPDATER.addAndGet(this, len);
+                return true;
+            } catch (Throwable e) {
+                log.error("Error occurred when append message to mappedFile.", 
e);
+                return false;
+            }
+        }
+
+        return false;
+    }

Review Comment:
   FileChannel.write(ByteBuffer)` is not guaranteed to write all remaining 
bytes in a single call. This implementation always increments `WROTE_POSITION` 
by `len` even if only a partial write occurred, which can corrupt subsequent 
writes and cause data loss. Loop until the buffer has no remaining bytes (or 
handle `0` writes appropriately), and increment `WROTE_POSITION` by the actual 
number of bytes written (accumulated).



##########
store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java:
##########
@@ -128,7 +133,12 @@ public void addMessage(final SelectMappedBufferResult 
mapedBuffer, final long qu
         this.msgCount4Commercial += (int) Math.ceil(
             mapedBuffer.getSize() /  (double)commercialSizePerMsg);
         this.messageCount++;
-        this.messageQueueOffset.add(queueOffset);
+        if (queueOffsetSize == messageQueueOffset.length) {
+            long[] newArr = new long[queueOffsetSize + (queueOffsetSize >> 1)];
+            System.arraycopy(messageQueueOffset, 0, newArr, 0, 
queueOffsetSize);
+            messageQueueOffset = newArr;
+        }
+        messageQueueOffset[queueOffsetSize++] = queueOffset;

Review Comment:
   new long[queueOffsetSize + (queueOffsetSize >> 1)]` can fail to grow the 
array when `queueOffsetSize == 1` (since `1 >> 1 == 0`), leading to an 
immediate `ArrayIndexOutOfBoundsException` on the next assignment. Ensure the 
growth calculation always increases capacity by at least 1 (for example, add `+ 
1` or use `Math.max(...)`). Note `addQueueOffset(...)` already accounts for 
this with `+ 1`, but `addMessage(...)` does not.



##########
store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java:
##########
@@ -167,8 +177,25 @@ public void setMsgCount4Commercial(int 
msgCount4Commercial) {
         this.msgCount4Commercial = msgCount4Commercial;
     }
 
+    public void addQueueOffset(long offset) {
+        if (queueOffsetSize == messageQueueOffset.length) {
+            long[] newArr = new long[queueOffsetSize + (queueOffsetSize >> 1) 
+ 1];
+            System.arraycopy(messageQueueOffset, 0, newArr, 0, 
queueOffsetSize);
+            messageQueueOffset = newArr;
+        }
+        messageQueueOffset[queueOffsetSize++] = offset;
+    }
+
     public List<Long> getMessageQueueOffset() {
-        return messageQueueOffset;
+        final long[] arr = this.messageQueueOffset;
+        final int size = this.queueOffsetSize;
+        return new AbstractList<Long>() {
+            @Override public Long get(int index) {
+                if (index < 0 || index >= size) throw new 
IndexOutOfBoundsException();
+                return arr[index];
+            }
+            @Override public int size() { return size; }
+        };
     }

Review Comment:
   getMessageQueueOffset()` previously returned a mutable `List<Long>` that 
callers could append to (as evidenced by updated tests). It now returns an 
`AbstractList` that does not support `add(...)`, which is a behavioral breaking 
change while keeping the same method signature. If external callers rely on 
mutability, this will now fail at runtime with `UnsupportedOperationException`. 
Consider either (a) returning a `List` implementation that supports `add(...)` 
by delegating to the internal array growth logic, or (b) making the method 
explicitly return an unmodifiable/snapshot view and updating the API contract 
(e.g., rename to indicate immutability and steer writers to `addQueueOffset`).



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -667,23 +667,7 @@ public CompletableFuture<PutMessageResult> 
asyncPutMessage(MessageExtBrokerInner
             }
         }
 
-        long beginTime = this.getSystemClock().now();
-        CompletableFuture<PutMessageResult> putResultFuture = 
this.commitLog.asyncPutMessage(msg);
-
-        putResultFuture.thenAccept(result -> {
-            long elapsedTime = this.getSystemClock().now() - beginTime;
-            if (elapsedTime > 500) {
-                LOGGER.warn("DefaultMessageStore#putMessage: 
CommitLog#putMessage cost {}ms, topic={}, bodyLength={}",
-                    elapsedTime, msg.getTopic(), msg.getBody().length);
-            }
-            this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
-
-            if (null == result || !result.isOk()) {
-                this.storeStatsService.getPutMessageFailedTimes().add(1);
-            }
-        });
-
-        return putResultFuture;
+        return this.commitLog.asyncPutMessage(msg);

Review Comment:
   This change removes the completion callback that recorded put-latency 
metrics (`setPutMessageEntireTimeMax`), logged slow puts, and incremented 
`putMessageFailedTimes` on failure. If these stats are not recorded elsewhere 
(e.g., inside `CommitLog`), operational visibility and failure counters will 
regress. Consider keeping the callback here or moving the metrics update into 
`CommitLog.asyncPutMessage(...)` so the behavior remains consistent.



##########
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java:
##########
@@ -852,7 +852,8 @@ private boolean putMessagePositionInfo(final long offset, 
final int size, final
             this.setMaxPhysicOffset(offset + size);
             boolean appendResult;
             if 
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
-                appendResult = 
mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array());
+                this.byteBufferIndex.flip();
+                appendResult = 
mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex);

Review Comment:
   flip()` mutates the shared `byteBufferIndex` state (position/limit). Since 
`appendMessageUsingFileChannel(ByteBuffer)` will also advance the buffer 
position as it writes, this changes the buffer’s state for any subsequent 
reuse. To avoid subtle state coupling, prefer writing from a duplicated view 
(e.g., duplicate/rewind/limit on a temp buffer) or restore the original buffer 
state after the write (e.g., `clear()`), keeping `byteBufferIndex` consistently 
reusable for the next index entry composition.



##########
store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java:
##########
@@ -119,6 +119,8 @@ public interface MappedFile {
      */
     boolean appendMessageUsingFileChannel(byte[] data);
 
+    boolean appendMessageUsingFileChannel(java.nio.ByteBuffer data);

Review Comment:
   The new `appendMessageUsingFileChannel(ByteBuffer)` overload is added 
without any Javadoc, and it uses a fully-qualified `java.nio.ByteBuffer` type 
while the rest of the file uses simple type names (and other files already 
import `ByteBuffer`). Add a brief Javadoc describing the expected ByteBuffer 
state (e.g., it writes from `position..limit` and advances position), and 
consider importing `java.nio.ByteBuffer` for consistency/readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to