This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 440be1ed4c [ISSUE #7031] fix Pop caused broker memory leak bug (#7032)
440be1ed4c is described below

commit 440be1ed4ce2af0ab58af6c3019de7075c09c20f
Author: fuyou001 <[email protected]>
AuthorDate: Mon Jul 17 19:23:23 2023 +0800

    [ISSUE #7031] fix Pop caused broker memory leak bug (#7032)
---
 .../broker/processor/PopBufferMergeService.java         | 17 ++++++++++++++++-
 .../rocketmq/broker/processor/PopMessageProcessor.java  | 11 ++++++-----
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index d7bc7c6946..b7ba8ad4a2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -429,9 +429,16 @@ public class PopBufferMergeService extends ServiceThread {
      * @param nextBeginOffset
      * @return
      */
-    public void addCkJustOffset(PopCheckPoint point, int reviveQueueId, long 
reviveQueueOffset, long nextBeginOffset) {
+    public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, 
long reviveQueueOffset, long nextBeginOffset) {
         PopCheckPointWrapper pointWrapper = new 
PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset, 
true);
 
+        if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
+            // when mergeKey conflict
+            // will cause PopBufferMergeService.scanCommitOffset cannot poll 
PopCheckPointWrapper
+            POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add 
ckJustOffset. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
+            return false;
+        }
+
         this.putCkToStore(pointWrapper, !checkQueueOk(pointWrapper));
 
         putOffsetQueue(pointWrapper);
@@ -440,6 +447,7 @@ public class PopBufferMergeService extends ServiceThread {
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("[PopBuffer]add ck just offset, {}", pointWrapper);
         }
+        return  true;
     }
 
     public void addCkMock(String group, String topic, int queueId, long 
startOffset, long invisibleTime,
@@ -492,6 +500,13 @@ public class PopBufferMergeService extends ServiceThread {
             return false;
         }
 
+        if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
+            // when mergeKey conflict
+            // will cause PopBufferMergeService.scanCommitOffset cannot poll 
PopCheckPointWrapper
+            POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ck. ck:{}, 
mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
+            return false;
+        }
+
         putOffsetQueue(pointWrapper);
         this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
         this.counter.incrementAndGet();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 28549bfedc..464f8f4fda 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -570,7 +570,9 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                         
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
                             requestHeader.getConsumerGroup(), topic, queueId, 
finalOffset);
                     } else {
-                        appendCheckPoint(requestHeader, topic, reviveQid, 
queueId, finalOffset, result, popTime, 
this.brokerController.getBrokerConfig().getBrokerName());
+                        if (!appendCheckPoint(requestHeader, topic, reviveQid, 
queueId, finalOffset, result, popTime, 
this.brokerController.getBrokerConfig().getBrokerName())) {
+                            return atomicRestNum.get() + 
result.getMessageCount();
+                        }
                     }
                     ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, 
isRetry, queueId, finalOffset);
                     ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, 
queueId,
@@ -685,7 +687,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         return msgInner;
     }
 
-    private void appendCheckPoint(final PopMessageRequestHeader requestHeader,
+    private boolean appendCheckPoint(final PopMessageRequestHeader 
requestHeader,
         final String topic, final int reviveQid, final int queueId, final long 
offset,
         final GetMessageResult getMessageTmpResult, final long popTime, final 
String brokerName) {
         // add check point msg to revive log
@@ -708,10 +710,9 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         );
 
         if (addBufferSuc) {
-            return;
+            return true;
         }
-
-        this.popBufferMergeService.addCkJustOffset(
+        return this.popBufferMergeService.addCkJustOffset(
             ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset()
         );
     }

Reply via email to