This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 440be1ed4c [ISSUE #7031] fix Pop caused broker memory leak bug (#7032)
440be1ed4c is described below
commit 440be1ed4ce2af0ab58af6c3019de7075c09c20f
Author: fuyou001 <[email protected]>
AuthorDate: Mon Jul 17 19:23:23 2023 +0800
[ISSUE #7031] fix Pop caused broker memory leak bug (#7032)
---
.../broker/processor/PopBufferMergeService.java | 17 ++++++++++++++++-
.../rocketmq/broker/processor/PopMessageProcessor.java | 11 ++++++-----
2 files changed, 22 insertions(+), 6 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index d7bc7c6946..b7ba8ad4a2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -429,9 +429,16 @@ public class PopBufferMergeService extends ServiceThread {
* @param nextBeginOffset
* @return
*/
- public void addCkJustOffset(PopCheckPoint point, int reviveQueueId, long
reviveQueueOffset, long nextBeginOffset) {
+ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId,
long reviveQueueOffset, long nextBeginOffset) {
PopCheckPointWrapper pointWrapper = new
PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset,
true);
+ if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
+ // when mergeKey conflict
+ // will cause PopBufferMergeService.scanCommitOffset cannot poll
PopCheckPointWrapper
+ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add
ckJustOffset. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
+ return false;
+ }
+
this.putCkToStore(pointWrapper, !checkQueueOk(pointWrapper));
putOffsetQueue(pointWrapper);
@@ -440,6 +447,7 @@ public class PopBufferMergeService extends ServiceThread {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]add ck just offset, {}", pointWrapper);
}
+ return true;
}
public void addCkMock(String group, String topic, int queueId, long
startOffset, long invisibleTime,
@@ -492,6 +500,13 @@ public class PopBufferMergeService extends ServiceThread {
return false;
}
+ if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
+ // when mergeKey conflict
+ // will cause PopBufferMergeService.scanCommitOffset cannot poll
PopCheckPointWrapper
+ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ck. ck:{},
mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
+ return false;
+ }
+
putOffsetQueue(pointWrapper);
this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
this.counter.incrementAndGet();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 28549bfedc..464f8f4fda 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -570,7 +570,9 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic, queueId,
finalOffset);
} else {
- appendCheckPoint(requestHeader, topic, reviveQid,
queueId, finalOffset, result, popTime,
this.brokerController.getBrokerConfig().getBrokerName());
+ if (!appendCheckPoint(requestHeader, topic, reviveQid,
queueId, finalOffset, result, popTime,
this.brokerController.getBrokerConfig().getBrokerName())) {
+ return atomicRestNum.get() +
result.getMessageCount();
+ }
}
ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo,
isRetry, queueId, finalOffset);
ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry,
queueId,
@@ -685,7 +687,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
return msgInner;
}
- private void appendCheckPoint(final PopMessageRequestHeader requestHeader,
+ private boolean appendCheckPoint(final PopMessageRequestHeader
requestHeader,
final String topic, final int reviveQid, final int queueId, final long
offset,
final GetMessageResult getMessageTmpResult, final long popTime, final
String brokerName) {
// add check point msg to revive log
@@ -708,10 +710,9 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
);
if (addBufferSuc) {
- return;
+ return true;
}
-
- this.popBufferMergeService.addCkJustOffset(
+ return this.popBufferMergeService.addCkJustOffset(
ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset()
);
}