This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4450391d1 Fix restNum calculation in pop consumption mode (#5843)
4450391d1 is described below

commit 4450391d1679d46e4182189b0f11dcffb27cc498
Author: SSpirits <[email protected]>
AuthorDate: Mon Jan 9 17:34:43 2023 +0800

    Fix restNum calculation in pop consumption mode (#5843)
---
 .../broker/processor/PopMessageProcessor.java      | 70 +++++++++++-----------
 1 file changed, 34 insertions(+), 36 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0f4de599a..5dca6c67b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -588,8 +588,8 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                 }
                 return CompletableFuture.completedFuture(result);
             }).thenApply(result -> {
-                
atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic, 
queueId) - atomicOffset.get() + atomicRestNum.get());
                 if (result == null) {
+                    
atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic, 
queueId) - atomicOffset.get() + atomicRestNum.get());
                     return atomicRestNum.get();
                 }
                 if (!result.getMessageMapedList().isEmpty()) {
@@ -632,46 +632,44 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
 //                
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
 requestHeader.getConsumerGroup(), topic,
 //                        queueId, getMessageTmpResult.getNextBeginOffset());
                 }
-                atomicRestNum.set(result.getMaxOffset() - 
result.getNextBeginOffset() + atomicRestNum.get());
 
-                if (result != null) {
-                    String brokerName = 
brokerController.getBrokerConfig().getBrokerName();
-                    for (SelectMappedBufferResult mapedBuffer : 
result.getMessageMapedList()) {
-                        // We should not recode buffer for normal topic message
-                        if (!isRetry) {
-                            getMessageResult.addMessage(mapedBuffer);
-                        } else {
-                            List<MessageExt> messageExtList = 
MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
-                                true, false, true);
-                            mapedBuffer.release();
-                            for (MessageExt messageExt : messageExtList) {
-                                try {
-                                    String ckInfo = 
ExtraInfoUtil.buildExtraInfo(finalOffset, popTime, 
requestHeader.getInvisibleTime(),
-                                        reviveQid, messageExt.getTopic(), 
brokerName, messageExt.getQueueId(), messageExt.getQueueOffset());
-                                    
messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo);
-
-                                    // Set retry message topic to origin topic 
and clear message store size to recode
-                                    
messageExt.setTopic(requestHeader.getTopic());
-                                    messageExt.setStoreSize(0);
-
-                                    byte[] encode = 
MessageDecoder.encode(messageExt, false);
-                                    ByteBuffer buffer = 
ByteBuffer.wrap(encode);
-                                    SelectMappedBufferResult tmpResult =
-                                        new 
SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length, 
null);
-                                    getMessageResult.addMessage(tmpResult);
-                                } catch (Exception e) {
-                                    POP_LOGGER.error("Exception in recode 
retry message buffer, topic={}", topic, e);
-                                }
+                atomicRestNum.set(result.getMaxOffset() - 
result.getNextBeginOffset() + atomicRestNum.get());
+                String brokerName = 
brokerController.getBrokerConfig().getBrokerName();
+                for (SelectMappedBufferResult mapedBuffer : 
result.getMessageMapedList()) {
+                    // We should not recode buffer for normal topic message
+                    if (!isRetry) {
+                        getMessageResult.addMessage(mapedBuffer);
+                    } else {
+                        List<MessageExt> messageExtList = 
MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
+                            true, false, true);
+                        mapedBuffer.release();
+                        for (MessageExt messageExt : messageExtList) {
+                            try {
+                                String ckInfo = 
ExtraInfoUtil.buildExtraInfo(finalOffset, popTime, 
requestHeader.getInvisibleTime(),
+                                    reviveQid, messageExt.getTopic(), 
brokerName, messageExt.getQueueId(), messageExt.getQueueOffset());
+                                
messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo);
+
+                                // Set retry message topic to origin topic and 
clear message store size to recode
+                                messageExt.setTopic(requestHeader.getTopic());
+                                messageExt.setStoreSize(0);
+
+                                byte[] encode = 
MessageDecoder.encode(messageExt, false);
+                                ByteBuffer buffer = ByteBuffer.wrap(encode);
+                                SelectMappedBufferResult tmpResult =
+                                    new 
SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length, 
null);
+                                getMessageResult.addMessage(tmpResult);
+                            } catch (Exception e) {
+                                POP_LOGGER.error("Exception in recode retry 
message buffer, topic={}", topic, e);
                             }
                         }
                     }
-                    
this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
-                        topic,
-                        requestHeader.getConsumerGroup(),
-                        queueId,
-                        result.getMessageCount()
-                    );
                 }
+                
this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
+                    topic,
+                    requestHeader.getConsumerGroup(),
+                    queueId,
+                    result.getMessageCount()
+                );
                 return atomicRestNum.get();
             }).whenComplete((result, throwable) -> {
                 if (throwable != null) {

Reply via email to