This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2113c16eb [RIP-48] Support reset offset in server-side for pop message 
(#5457)
2113c16eb is described below

commit 2113c16ebc4fa22cb47d812cda844b84882aa55f
Author: lizhimins <[email protected]>
AuthorDate: Mon Nov 7 13:55:03 2022 +0800

    [RIP-48] Support reset offset in server-side for pop message (#5457)
    
    * [RIP-48] Support reset offset in server-side for pop message
    
    * [RIP-48] Support reset offset in server-side for pop message
    
    * no need remove error log
    
    Co-authored-by: 斜阳 <[email protected]>
---
 .../broker/offset/ConsumerOrderInfoManager.java    |   7 +
 .../broker/processor/PopMessageProcessor.java      |  45 ++-
 .../test/client/rmq/RMQNormalConsumer.java         |  17 +-
 .../rocketmq/test/client/rmq/RMQPopClient.java     |  13 +-
 .../rocketmq/test/client/rmq/RMQPopConsumer.java   |  77 ++++-
 .../rocketmq/test/factory/ConsumerFactory.java     |   8 +-
 .../rocketmq/test/util/MQAdminTestUtils.java       |   2 +-
 .../rocketmq/test/offset/OffsetResetForPopIT.java  | 355 +++++++++++++++++++++
 .../apache/rocketmq/tools/command/CommandUtil.java |  13 +
 9 files changed, 505 insertions(+), 32 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 5b0bb9760..6f480f49d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -159,6 +159,13 @@ public class ConsumerOrderInfoManager extends 
ConfigManager {
         return orderInfo.needBlock(invisibleTime);
     }
 
+    public void clearBlock(String topic, String group, int queueId) {
+        table.computeIfPresent(buildKey(topic, group), (key, val) -> {
+            val.remove(queueId);
+            return val;
+        });
+    }
+
     /**
      * mark message is consumed finished. return the consumer offset
      *
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0b8801ebf..ac1dd4615 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -498,12 +498,15 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         String lockKey =
             topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + 
PopAckConstants.SPLIT + queueId;
         boolean isOrder = requestHeader.isOrder();
-        long offset = getPopOffset(topic, requestHeader, queueId, false, 
lockKey);
+        long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), 
queueId, requestHeader.getInitMode(),
+            false, lockKey, false);
         if (!queueLockManager.tryLock(lockKey)) {
             restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 
offset + restNum;
             return restNum;
         }
-        offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
+        offset = getPopOffset(topic, requestHeader.getConsumerGroup(), 
queueId, requestHeader.getInitMode(),
+            true, lockKey, true);
+
         GetMessageResult getMessageTmpResult = null;
         try {
             if (isOrder && 
brokerController.getConsumerOrderInfoManager().checkBlock(topic,
@@ -594,12 +597,12 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         return restNum;
     }
 
-    private long getPopOffset(String topic, PopMessageRequestHeader 
requestHeader, int queueId, boolean init,
-        String lockKey) {
-        long offset = 
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
-            topic, queueId);
+    private long getPopOffset(String topic, String group, int queueId, int 
initMode, boolean init, String lockKey,
+        boolean checkResetOffset) {
+
+        long offset = 
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, 
queueId);
         if (offset < 0) {
-            if (ConsumeInitMode.MIN == requestHeader.getInitMode()) {
+            if (ConsumeInitMode.MIN == initMode) {
                 offset = 
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
             } else {
                 // pop last one,then commit offset.
@@ -609,17 +612,24 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                     offset = 0;
                 }
                 if (init) {
-                    
this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset",
-                        requestHeader.getConsumerGroup(), topic,
-                        queueId, offset);
+                    
this.brokerController.getConsumerOffsetManager().commitOffset(
+                        "getPopOffset", group, topic, queueId, offset);
                 }
             }
         }
+
+        if (checkResetOffset) {
+            Long resetOffset = resetPopOffset(topic, group, queueId);
+            if (resetOffset != null) {
+                return resetOffset;
+            }
+        }
+
         long bufferOffset = 
this.popBufferMergeService.getLatestOffset(lockKey);
         if (bufferOffset < 0) {
             return offset;
         } else {
-            return bufferOffset > offset ? bufferOffset : offset;
+            return Math.max(bufferOffset, offset);
         }
     }
 
@@ -734,6 +744,19 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         );
     }
 
+    private Long resetPopOffset(String topic, String group, int queueId) {
+        String lockKey = topic + PopAckConstants.SPLIT + group + 
PopAckConstants.SPLIT + queueId;
+        Long resetOffset =
+            
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topic,
 group, queueId);
+        if (resetOffset != null) {
+            
this.brokerController.getConsumerOrderInfoManager().clearBlock(topic, group, 
queueId);
+            this.getPopBufferMergeService().clearOffsetQueue(lockKey);
+            this.brokerController.getConsumerOffsetManager()
+                .commitOffset("ResetPopOffset", group, topic, queueId, 
resetOffset);
+        }
+        return resetOffset;
+    }
+
     private byte[] readGetMessageResult(final GetMessageResult 
getMessageResult, final String group, final String topic,
         final int queueId) {
         final ByteBuffer byteBuffer = 
ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
index 71f908887..7cbeaa810 100644
--- 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
+++ 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -25,7 +25,8 @@ import org.apache.rocketmq.test.listener.AbstractListener;
 import org.apache.rocketmq.test.util.RandomUtil;
 
 public class RMQNormalConsumer extends AbstractMQConsumer {
-    private static Logger logger = Logger.getLogger(RMQNormalConsumer.class);
+
+    private static final Logger LOGGER = 
Logger.getLogger(RMQNormalConsumer.class);
     protected DefaultMQPushConsumer consumer = null;
 
     public RMQNormalConsumer(String nsAddr, String topic, String subExpression,
@@ -33,18 +34,22 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
         super(nsAddr, topic, subExpression, consumerGroup, listener);
     }
 
+    @Override
     public AbstractListener getListener() {
         return listener;
     }
 
+    @Override
     public void setListener(AbstractListener listener) {
         this.listener = listener;
     }
 
+    @Override
     public void create() {
         create(false);
     }
 
+    @Override
     public void create(boolean useTLS) {
         consumer = new DefaultMQPushConsumer(consumerGroup);
         consumer.setInstanceName(RandomUtil.getStringByUUID());
@@ -53,19 +58,20 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
         try {
             consumer.subscribe(topic, subExpression);
         } catch (MQClientException e) {
-            logger.error("consumer subscribe failed!");
+            LOGGER.error("consumer subscribe failed!");
             e.printStackTrace();
         }
         consumer.setMessageListener(listener);
         consumer.setUseTLS(useTLS);
     }
 
+    @Override
     public void start() {
         try {
             consumer.start();
-            logger.info(String.format("consumer[%s] started!", 
consumer.getConsumerGroup()));
+            LOGGER.info(String.format("consumer[%s] started!", 
consumer.getConsumerGroup()));
         } catch (MQClientException e) {
-            logger.error("consumer start failed!");
+            LOGGER.error("consumer start failed!");
             e.printStackTrace();
         }
     }
@@ -74,11 +80,12 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
         try {
             consumer.subscribe(topic, subExpression);
         } catch (MQClientException e) {
-            logger.error("consumer subscribe failed!");
+            LOGGER.error("consumer subscribe failed!");
             e.printStackTrace();
         }
     }
 
+    @Override
     public void shutdown() {
         consumer.shutdown();
     }
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 558acb804..c502529ba 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -35,7 +35,9 @@ import org.apache.rocketmq.test.clientinterface.MQConsumer;
 import org.apache.rocketmq.test.util.RandomUtil;
 
 public class RMQPopClient implements MQConsumer {
+
     private static final long DEFAULT_TIMEOUT = 3000;
+
     private MQClientAPIImpl mqClientAPI;
 
     @Override
@@ -50,10 +52,8 @@ public class RMQPopClient implements MQConsumer {
 
         NettyClientConfig nettyClientConfig = new NettyClientConfig();
         nettyClientConfig.setUseTLS(useTLS);
-        this.mqClientAPI = new MQClientAPIImpl(nettyClientConfig,
-            new ClientRemotingProcessor(null),
-            null,
-            clientConfig);
+        this.mqClientAPI = new MQClientAPIImpl(
+            nettyClientConfig, new ClientRemotingProcessor(null), null, 
clientConfig);
     }
 
     @Override
@@ -103,8 +103,9 @@ public class RMQPopClient implements MQConsumer {
         return future;
     }
 
-    public CompletableFuture<AckResult> ackMessageAsync(String brokerAddr, 
String topic, String consumerGroup,
-        String extraInfo) {
+    public CompletableFuture<AckResult> ackMessageAsync(
+        String brokerAddr, String topic, String consumerGroup, String 
extraInfo) {
+
         String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
         AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
         requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs, 
topic, consumerGroup));
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java
index 036f60e1d..49a06bb76 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java
@@ -17,17 +17,84 @@
 
 package org.apache.rocketmq.test.client.rmq;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
 import org.apache.rocketmq.test.listener.AbstractListener;
 
 public class RMQPopConsumer extends RMQNormalConsumer {
+
+    private static final Logger log = Logger.getLogger(RMQPopConsumer.class);
+
+    public static final long POP_TIMEOUT = 3000;
+    public static final long DEFAULT_INVISIBLE_TIME = 30000;
+
+    private RMQPopClient client;
+
+    private int maxNum = 16;
+
     public RMQPopConsumer(String nsAddr, String topic, String subExpression,
-        String consumerGroup, AbstractListener listner) {
-        super(nsAddr, topic, subExpression, consumerGroup, listner);
+        String consumerGroup, AbstractListener listener) {
+        super(nsAddr, topic, subExpression, consumerGroup, listener);
+    }
+
+    public RMQPopConsumer(String nsAddr, String topic, String subExpression,
+        String consumerGroup, AbstractListener listener, int maxNum) {
+        super(nsAddr, topic, subExpression, consumerGroup, listener);
+        this.maxNum = maxNum;
+    }
+
+    @Override
+    public void start() {
+        client = ConsumerFactory.getRMQPopClient();
+        log.info(String.format("consumer[%s] started!", consumerGroup));
     }
 
     @Override
-    public void create() {
-        super.create();
-        consumer.setClientRebalance(false);
+    public void shutdown() {
+        client.shutdown();
+    }
+
+    public PopResult pop(String brokerAddr, MessageQueue mq) throws Exception {
+        return this.pop(brokerAddr, mq, DEFAULT_INVISIBLE_TIME, 5000);
+    }
+
+    public PopResult pop(String brokerAddr, MessageQueue mq, long 
invisibleTime, long timeout)
+        throws InterruptedException, RemotingException, MQClientException, 
MQBrokerException,
+        ExecutionException, TimeoutException {
+
+        CompletableFuture<PopResult> future = this.client.popMessageAsync(
+            brokerAddr, mq, invisibleTime, maxNum, consumerGroup, timeout, 
true,
+            ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+
+        return future.get();
+    }
+
+    public PopResult popOrderly(String brokerAddr, MessageQueue mq) throws 
Exception {
+        return this.popOrderly(brokerAddr, mq, DEFAULT_INVISIBLE_TIME, 5000);
+    }
+
+    public PopResult popOrderly(String brokerAddr, MessageQueue mq, long 
invisibleTime, long timeout)
+        throws InterruptedException, ExecutionException {
+
+        CompletableFuture<PopResult> future = this.client.popMessageAsync(
+            brokerAddr, mq, invisibleTime, maxNum, consumerGroup, timeout, 
true,
+            ConsumeInitMode.MIN, true, ExpressionType.TAG, "*");
+
+        return future.get();
+    }
+
+    public CompletableFuture<AckResult> ackAsync(String brokerAddr, String 
extraInfo) {
+        return this.client.ackMessageAsync(brokerAddr, topic, consumerGroup, 
extraInfo);
     }
 }
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java 
b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
index 27f5dcbdd..cdda908f6 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -64,11 +64,11 @@ public class ConsumerFactory {
         consumer.start();
         return consumer;
     }
+
     public static RMQPopConsumer getRMQPopConsumer(String nsAddr, String 
consumerGroup,
-        String topic, String subExpression,
-        AbstractListener listener) {
-        RMQPopConsumer consumer = new RMQPopConsumer(nsAddr, topic, 
subExpression,
-            consumerGroup, listener);
+        String topic, String subExpression, AbstractListener listener) {
+
+        RMQPopConsumer consumer = new RMQPopConsumer(nsAddr, topic, 
subExpression, consumerGroup, listener);
         consumer.create();
         consumer.start();
         return consumer;
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java 
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index c4d50f737..5f9f7a26e 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -77,7 +77,7 @@ public class MQAdminTestUtils {
         return true;
     }
 
-    private static boolean checkTopicExist(DefaultMQAdminExt mqAdminExt, 
String topic) {
+    public static boolean checkTopicExist(DefaultMQAdminExt mqAdminExt, String 
topic) {
         boolean createResult = false;
         try {
             TopicStatsTable topicInfo = mqAdminExt.examineTopicStats(topic);
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java 
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
new file mode 100644
index 000000000..cf17d6260
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.offset;
+
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQPopConsumer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class OffsetResetForPopIT extends BaseConf {
+
+    private static final Logger LOGGER = 
Logger.getLogger(OffsetResetForPopIT.class);
+
+    private String topic;
+    private String group;
+    private RMQNormalProducer producer = null;
+    private RMQPopConsumer consumer = null;
+    private DefaultMQAdminExt adminExt;
+
+    @Before
+    public void setUp() throws Exception {
+        // reset pop offset rely on server side offset
+        brokerController1.getBrokerConfig().setUseServerSideResetOffset(true);
+
+        adminExt = BaseConf.getAdmin(NAMESRV_ADDR);
+        adminExt.start();
+
+        topic = MQRandomUtils.getRandomTopic();
+        this.createAndWaitTopicRegister(BROKER1_NAME, topic);
+        group = initConsumerGroup();
+        LOGGER.info(String.format("use topic: %s, group: %s", topic, group));
+        producer = getProducer(NAMESRV_ADDR, topic);
+    }
+
+    @After
+    public void tearDown() {
+        shutdown();
+    }
+
+    private void createAndWaitTopicRegister(String brokerName, String topic) 
throws Exception {
+        String brokerAddress = 
CommandUtil.fetchMasterAddrByBrokerName(adminExt, brokerName);
+        TopicConfig topicConfig = new TopicConfig(topic);
+        topicConfig.setReadQueueNums(1);
+        topicConfig.setWriteQueueNums(1);
+        adminExt.createAndUpdateTopicConfig(brokerAddress, topicConfig);
+
+        await().atMost(30, TimeUnit.SECONDS).until(
+            () -> MQAdminTestUtils.checkTopicExist(adminExt, topic));
+    }
+
+    private void resetOffsetInner(long resetOffset) {
+        try {
+            // reset offset by queue
+            adminExt.resetOffsetByQueueId(brokerController1.getBrokerAddr(),
+                consumer.getConsumerGroup(), consumer.getTopic(), 0, 
resetOffset);
+        } catch (Exception ignore) {
+        }
+    }
+
+    private void ackMessageSync(MessageExt messageExt) {
+        try {
+            consumer.ackAsync(brokerController1.getBrokerAddr(),
+                messageExt.getProperty(MessageConst.PROPERTY_POP_CK)).get();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void ackMessageSync(List<MessageExt> messageExtList) {
+        if (messageExtList != null) {
+            messageExtList.forEach(this::ackMessageSync);
+        }
+    }
+
+    @Test
+    public void testResetOffsetAfterPop() throws Exception {
+        int messageCount = 10;
+        int resetOffset = 4;
+        producer.send(messageCount);
+        consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new 
RMQNormalListener());
+        consumer.start();
+
+        MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+        PopResult popResult = consumer.pop(brokerController1.getBrokerAddr(), 
mq);
+        Assert.assertEquals(10, popResult.getMsgFoundList().size());
+
+        resetOffsetInner(resetOffset);
+        popResult = consumer.pop(brokerController1.getBrokerAddr(), mq);
+        Assert.assertTrue(popResult != null && popResult.getMsgFoundList() != 
null);
+        Assert.assertEquals(messageCount - resetOffset, 
popResult.getMsgFoundList().size());
+    }
+
+    @Test
+    public void testResetOffsetThenAckOldForPopOrderly() throws Exception {
+        int messageCount = 10;
+        int resetOffset = 2;
+        producer.send(messageCount);
+        consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new 
RMQNormalListener());
+        consumer.start();
+
+        MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+        PopResult popResult1 = 
consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
+        Assert.assertEquals(10, popResult1.getMsgFoundList().size());
+
+        resetOffsetInner(resetOffset);
+        ConsumeStats consumeStats = adminExt.examineConsumeStats(group, topic);
+        Assert.assertEquals(resetOffset, 
consumeStats.getOffsetTable().get(mq).getConsumerOffset());
+
+        PopResult popResult2 = 
consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
+        Assert.assertTrue(popResult2 != null && popResult2.getMsgFoundList() 
!= null);
+        Assert.assertEquals(messageCount - resetOffset, 
popResult2.getMsgFoundList().size());
+
+        // ack old msg, expect has no effect
+        ackMessageSync(popResult1.getMsgFoundList());
+        Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
+            .checkBlock(topic, group, 0, 
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+
+        // ack new msg
+        ackMessageSync(popResult2.getMsgFoundList());
+        Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
+            .checkBlock(topic, group, 0, 
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+    }
+
+    @Test
+    public void testRestOffsetToSkipMsgForPopOrderly() throws Exception {
+        int messageCount = 10;
+        int resetOffset = 4;
+        producer.send(messageCount);
+        consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new 
RMQNormalListener());
+        resetOffsetInner(resetOffset);
+        consumer.start();
+
+        MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+        PopResult popResult = 
consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
+        Assert.assertEquals(messageCount - resetOffset, 
popResult.getMsgFoundList().size());
+        Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
+            .checkBlock(topic, group, 0, 
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+
+        ackMessageSync(popResult.getMsgFoundList());
+        TimeUnit.SECONDS.sleep(1);
+        Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
+            .checkBlock(topic, group, 0, 
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+    }
+
+    @Test
+    public void testResetOffsetAfterPopWhenOpenBufferAndWait() throws 
Exception {
+        int messageCount = 10;
+        int resetOffset = 4;
+        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+        producer.send(messageCount);
+        consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new 
RMQNormalListener());
+        consumer.start();
+
+        MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+        PopResult popResult = consumer.pop(brokerController1.getBrokerAddr(), 
mq);
+        Assert.assertEquals(10, popResult.getMsgFoundList().size());
+
+        resetOffsetInner(resetOffset);
+        
TimeUnit.MILLISECONDS.sleep(brokerController1.getBrokerConfig().getPopCkStayBufferTimeOut());
+
+        popResult = consumer.pop(brokerController1.getBrokerAddr(), mq);
+        Assert.assertTrue(popResult != null && popResult.getMsgFoundList() != 
null);
+        Assert.assertEquals(messageCount - resetOffset, 
popResult.getMsgFoundList().size());
+    }
+
+    @Test
+    public void testResetOffsetWhilePopWhenOpenBuffer() {
+        testResetOffsetWhilePop(8, false, false, 5);
+    }
+
+    @Test
+    public void testResetOffsetWhilePopWhenOpenBufferAndAck() {
+        testResetOffsetWhilePop(8, false, true, 5);
+    }
+
+    @Test
+    public void testMultipleResetOffsetWhilePopWhenOpenBufferAndAck() {
+        testResetOffsetWhilePop(8, false, true, 3, 5);
+    }
+
+    @Test
+    public void testResetFutureOffsetWhilePopWhenOpenBufferAndAck() {
+        testResetOffsetWhilePop(2, true, true, 8);
+    }
+
+    @Test
+    public void testMultipleResetFutureOffsetWhilePopWhenOpenBufferAndAck() {
+        testResetOffsetWhilePop(2, true, true, 5, 8);
+    }
+
+    private void testResetOffsetWhilePop(int targetCount, boolean resetFuture, 
boolean needAck,
+        int... resetOffset) {
+        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+        producer.send(10);
+
+        // max pop one message per request
+        consumer =
+            new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new 
RMQNormalListener(), 1);
+
+        MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+        AtomicInteger counter = new AtomicInteger(0);
+        consumer.start();
+        Executors.newSingleThreadScheduledExecutor().execute(() -> {
+            long start = System.currentTimeMillis();
+            while (System.currentTimeMillis() - start <= 30 * 1000L) {
+                try {
+                    PopResult popResult = 
consumer.pop(brokerController1.getBrokerAddr(), mq);
+                    if (popResult == null || popResult.getMsgFoundList() == 
null) {
+                        continue;
+                    }
+
+                    int count = 
counter.addAndGet(popResult.getMsgFoundList().size());
+                    if (needAck) {
+                        ackMessageSync(popResult.getMsgFoundList());
+                    }
+                    if (count == targetCount) {
+                        for (int offset : resetOffset) {
+                            resetOffsetInner(offset);
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            boolean result = true;
+            if (resetFuture) {
+                result = counter.get() < 10;
+            }
+            result &= counter.get() >= targetCount + 10 - 
resetOffset[resetOffset.length - 1];
+            return result;
+        });
+    }
+
+    @Test
+    public void testResetFutureOffsetWhilePopOrderlyAndAck() {
+        testResetOffsetWhilePopOrderly(1,
+            Lists.newArrayList(0, 5, 6, 7, 8, 9), Lists.newArrayList(5), 6);
+    }
+
+    @Test
+    public void testMultipleResetFutureOffsetWhilePopOrderlyAndAck() {
+        testResetOffsetWhilePopOrderly(1,
+            Lists.newArrayList(0, 5, 6, 7, 8, 9), Lists.newArrayList(3, 5), 6);
+    }
+
+    @Test
+    public void testResetOffsetWhilePopOrderlyAndAck() {
+        testResetOffsetWhilePopOrderly(5,
+            Lists.newArrayList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
+            Lists.newArrayList(3), 12);
+    }
+
+    @Test
+    public void testMultipleResetOffsetWhilePopOrderlyAndAck() {
+        testResetOffsetWhilePopOrderly(5,
+            Lists.newArrayList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
+            Lists.newArrayList(3, 1), 14);
+    }
+
+    private void testResetOffsetWhilePopOrderly(int targetCount, List<Integer> 
expectMsgReceive,
+        List<Integer> resetOffset, int expectCount) {
+        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+        for (int i = 0; i < 10; i++) {
+            Message msg = new Message(topic, (String.valueOf(i)).getBytes());
+            producer.send(msg);
+        }
+        consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new 
RMQNormalListener(), 1);
+        MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+        Set<Integer> msgReceive = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+        AtomicInteger counter = new AtomicInteger(0);
+        consumer.start();
+
+        Executors.newSingleThreadScheduledExecutor().execute(() -> {
+            long start = System.currentTimeMillis();
+            while (System.currentTimeMillis() - start <= 30 * 1000L) {
+                try {
+                    PopResult popResult = 
consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
+                    if (popResult == null || popResult.getMsgFoundList() == 
null) {
+                        continue;
+                    }
+                    int count = 
counter.addAndGet(popResult.getMsgFoundList().size());
+                    for (MessageExt messageExt : popResult.getMsgFoundList()) {
+                        msgReceive.add(Integer.valueOf(new 
String(messageExt.getBody())));
+                        ackMessageSync(messageExt);
+                    }
+                    if (count == targetCount) {
+                        for (int offset : resetOffset) {
+                            resetOffsetInner(offset);
+                        }
+                    }
+                } catch (Exception e) {
+                    // do nothing;
+                }
+            }
+        });
+
+        await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            boolean result = true;
+            if (expectMsgReceive.size() != msgReceive.size()) {
+                return false;
+            }
+            if (counter.get() != expectCount) {
+                return false;
+            }
+            for (Integer expectMsg : expectMsgReceive) {
+                result &= msgReceive.contains(expectMsg);
+            }
+            return result;
+        });
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
index a6b612070..702918196 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
@@ -134,6 +134,19 @@ public class CommandUtil {
         return brokerAddressSet;
     }
 
+    public static String fetchMasterAddrByBrokerName(final MQAdminExt adminExt,
+        final String brokerName) throws Exception {
+        ClusterInfo clusterInfoSerializeWrapper = 
adminExt.examineBrokerClusterInfo();
+        BrokerData brokerData = 
clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+        if (null != brokerData) {
+            String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+            if (addr != null) {
+                return addr;
+            }
+        }
+        throw new Exception(String.format("No broker address for broker name 
%s.%n", brokerData));
+    }
+
     public static Set<String> fetchMasterAndSlaveAddrByBrokerName(final 
MQAdminExt adminExt, final String brokerName)
         throws InterruptedException, RemotingConnectException, 
RemotingTimeoutException,
         RemotingSendRequestException, MQBrokerException {

Reply via email to