This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4450391d1 Fix restNum calculation in pop consumption mode (#5843)
4450391d1 is described below
commit 4450391d1679d46e4182189b0f11dcffb27cc498
Author: SSpirits <[email protected]>
AuthorDate: Mon Jan 9 17:34:43 2023 +0800
Fix restNum calculation in pop consumption mode (#5843)
---
.../broker/processor/PopMessageProcessor.java | 70 +++++++++++-----------
1 file changed, 34 insertions(+), 36 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0f4de599a..5dca6c67b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -588,8 +588,8 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
return CompletableFuture.completedFuture(result);
}).thenApply(result -> {
-
atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic,
queueId) - atomicOffset.get() + atomicRestNum.get());
if (result == null) {
+
atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic,
queueId) - atomicOffset.get() + atomicRestNum.get());
return atomicRestNum.get();
}
if (!result.getMessageMapedList().isEmpty()) {
@@ -632,46 +632,44 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
//
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic,
// queueId, getMessageTmpResult.getNextBeginOffset());
}
- atomicRestNum.set(result.getMaxOffset() -
result.getNextBeginOffset() + atomicRestNum.get());
- if (result != null) {
- String brokerName =
brokerController.getBrokerConfig().getBrokerName();
- for (SelectMappedBufferResult mapedBuffer :
result.getMessageMapedList()) {
- // We should not recode buffer for normal topic message
- if (!isRetry) {
- getMessageResult.addMessage(mapedBuffer);
- } else {
- List<MessageExt> messageExtList =
MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
- true, false, true);
- mapedBuffer.release();
- for (MessageExt messageExt : messageExtList) {
- try {
- String ckInfo =
ExtraInfoUtil.buildExtraInfo(finalOffset, popTime,
requestHeader.getInvisibleTime(),
- reviveQid, messageExt.getTopic(),
brokerName, messageExt.getQueueId(), messageExt.getQueueOffset());
-
messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo);
-
- // Set retry message topic to origin topic
and clear message store size to recode
-
messageExt.setTopic(requestHeader.getTopic());
- messageExt.setStoreSize(0);
-
- byte[] encode =
MessageDecoder.encode(messageExt, false);
- ByteBuffer buffer =
ByteBuffer.wrap(encode);
- SelectMappedBufferResult tmpResult =
- new
SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length,
null);
- getMessageResult.addMessage(tmpResult);
- } catch (Exception e) {
- POP_LOGGER.error("Exception in recode
retry message buffer, topic={}", topic, e);
- }
+ atomicRestNum.set(result.getMaxOffset() -
result.getNextBeginOffset() + atomicRestNum.get());
+ String brokerName =
brokerController.getBrokerConfig().getBrokerName();
+ for (SelectMappedBufferResult mapedBuffer :
result.getMessageMapedList()) {
+ // We should not recode buffer for normal topic message
+ if (!isRetry) {
+ getMessageResult.addMessage(mapedBuffer);
+ } else {
+ List<MessageExt> messageExtList =
MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
+ true, false, true);
+ mapedBuffer.release();
+ for (MessageExt messageExt : messageExtList) {
+ try {
+ String ckInfo =
ExtraInfoUtil.buildExtraInfo(finalOffset, popTime,
requestHeader.getInvisibleTime(),
+ reviveQid, messageExt.getTopic(),
brokerName, messageExt.getQueueId(), messageExt.getQueueOffset());
+
messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo);
+
+ // Set retry message topic to origin topic and
clear message store size to recode
+ messageExt.setTopic(requestHeader.getTopic());
+ messageExt.setStoreSize(0);
+
+ byte[] encode =
MessageDecoder.encode(messageExt, false);
+ ByteBuffer buffer = ByteBuffer.wrap(encode);
+ SelectMappedBufferResult tmpResult =
+ new
SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length,
null);
+ getMessageResult.addMessage(tmpResult);
+ } catch (Exception e) {
+ POP_LOGGER.error("Exception in recode retry
message buffer, topic={}", topic, e);
}
}
}
-
this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
- topic,
- requestHeader.getConsumerGroup(),
- queueId,
- result.getMessageCount()
- );
}
+
this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
+ topic,
+ requestHeader.getConsumerGroup(),
+ queueId,
+ result.getMessageCount()
+ );
return atomicRestNum.get();
}).whenComplete((result, throwable) -> {
if (throwable != null) {