This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e3c2111fe [ISSUE #4821] Add some integration tests for POP consumption 
in slave-acting-master mode (#4822)
e3c2111fe is described below

commit e3c2111febcc90e64c3860cef539bb6b9780c12c
Author: caigy <[email protected]>
AuthorDate: Tue Aug 16 21:39:41 2022 +0800

    [ISSUE #4821] Add some integration tests for POP consumption in 
slave-acting-master mode (#4822)
    
    * fix #4821
    
    * add license header and ignore executing integration tests in automatic 
flow
    
    * fix unit test
    
    * fix unit test
---
 .../processor/ChangeInvisibleTimeProcessor.java    |   2 +-
 .../broker/processor/PopBufferMergeService.java    |  13 +-
 .../broker/processor/PopMessageProcessor.java      |   4 +-
 .../broker/processor/PopReviveService.java         |   2 +-
 .../ChangeInvisibleTimeProcessorTest.java          |  11 +-
 .../broker/processor/PopMessageProcessorTest.java  |   1 -
 .../test/container/PopSlaveActingMasterIT.java     | 588 +++++++++++++++++++++
 7 files changed, 613 insertions(+), 8 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index bb68badc1..76c1b908e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -189,7 +189,7 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         msgInner.setDeliverTimeMs(ck.getReviveTime() - 
PopAckConstants.ackTimeInterval);
         
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genCkUniqueId(ck));
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
-        PutMessageResult putMessageResult = 
this.brokerController.getMessageStore().putMessage(msgInner);
+        PutMessageResult putMessageResult = 
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
 
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("change Invisible , appendCheckPoint, topic {}, 
queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}", 
requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(), 
offset,
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 4b2df0875..bb432a851 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -538,7 +538,7 @@ public class PopBufferMergeService extends ServiceThread {
             return;
         }
         MessageExtBrokerInner msgInner = 
popMessageProcessor.buildCkMsg(pointWrapper.getCk(), 
pointWrapper.getReviveQueueId());
-        PutMessageResult putMessageResult = 
brokerController.getMessageStore().putMessage(msgInner);
+        PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
@@ -547,7 +547,14 @@ public class PopBufferMergeService extends ServiceThread {
             return;
         }
         pointWrapper.setCkStored(true);
-        
pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+
+        if (putMessageResult.isRemotePut()) {
+            //No AppendMessageResult when escaping remotely
+            pointWrapper.setReviveQueueOffset(0);
+        } else {
+            
pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+        }
+
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("[PopBuffer]put ck to store ok: {}, {}", 
pointWrapper, putMessageResult);
         }
@@ -575,7 +582,7 @@ public class PopBufferMergeService extends ServiceThread {
         
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genAckUniqueId(ackMsg));
 
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
-        PutMessageResult putMessageResult = 
brokerController.getMessageStore().putMessage(msgInner);
+        PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 842d0d98b..0d2c5f9b5 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -477,7 +477,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             return restNum;
         }
         offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
-        GetMessageResult getMessageTmpResult;
+        GetMessageResult getMessageTmpResult = null;
         try {
             if (isOrder && 
brokerController.getConsumerOrderInfoManager().checkBlock(topic,
                 requestHeader.getConsumerGroup(), queueId, 
requestHeader.getInvisibleTime())) {
@@ -544,6 +544,8 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
 //                
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
 requestHeader.getConsumerGroup(), topic,
 //                        queueId, getMessageTmpResult.getNextBeginOffset());
             }
+        } catch (Exception e) {
+            POP_LOGGER.error("Exception in popMsgFromQueue", e);
         } finally {
             queueLockManager.unLock(lockKey);
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 1b95cce4e..1a6c52ec3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -106,7 +106,7 @@ public class PopReviveService extends ServiceThread {
         }
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
-        PutMessageResult putMessageResult = 
brokerController.getMessageStore().putMessage(msgInner);
+        PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId 
{}, offset {}, reviveDelay={}, result is {} ",
                 queueId, popCheckPoint, messageExt.getQueueId(), 
messageExt.getQueueOffset(),
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index 2a009e951..811913a26 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Field;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.failover.EscapeBridge;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -75,12 +76,20 @@ public class ChangeInvisibleTimeProcessorTest {
     @Mock
     private Broker2Client broker2Client;
 
+    @Mock
+    private EscapeBridge escapeBridge = new 
EscapeBridge(this.brokerController);
+
     @Before
     public void init() throws IllegalAccessException, NoSuchFieldException {
         brokerController.setMessageStore(messageStore);
         Field field = BrokerController.class.getDeclaredField("broker2Client");
         field.setAccessible(true);
         field.set(brokerController, broker2Client);
+
+        Field ebField = 
BrokerController.class.getDeclaredField("escapeBridge");
+        ebField.setAccessible(true);
+        ebField.set(brokerController, this.escapeBridge);
+
         Channel mockChannel = mock(Channel.class);
         when(handlerContext.channel()).thenReturn(mockChannel);
         
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig());
@@ -99,7 +108,7 @@ public class ChangeInvisibleTimeProcessorTest {
 
     @Test
     public void testProcessRequest_Success() throws RemotingCommandException, 
InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
-        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(new
 PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
         int queueId = 0;
         long queueOffset = 0;
         long popTime = System.currentTimeMillis() - 1_000;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index af6b4bb55..7ea20ceff 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -78,7 +78,6 @@ public class PopMessageProcessorTest {
     public void init() {
         brokerController.setMessageStore(messageStore);
         popMessageProcessor = new PopMessageProcessor(brokerController);
-        when(messageStore.putMessage(any())).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
         Channel mockChannel = mock(Channel.class);
         when(mockChannel.remoteAddress()).thenReturn(new 
InetSocketAddress(1024));
         when(handlerContext.channel()).thenReturn(mockChannel);
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
new file mode 100644
index 000000000..17d5e0cc3
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.container.BrokerContainer;
+import org.apache.rocketmq.container.InnerBrokerController;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+@Ignore
+public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase {
+    private static final String CONSUME_GROUP = 
PopSlaveActingMasterIT.class.getSimpleName() + "_Consumer";
+    private final static int MESSAGE_COUNT = 16;
+    private final static Random random = new Random();
+    private static DefaultMQProducer producer;
+    private final static String MESSAGE_STRING = 
RandomStringUtils.random(1024);
+    private static byte[] MESSAGE_BODY;
+
+    public PopSlaveActingMasterIT() {
+    }
+
+    static {
+        try {
+            MESSAGE_BODY = 
MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+        } catch (UnsupportedEncodingException e) {
+            e.printStackTrace();
+        }
+    }
+
+    void createTopic(String topic) {
+        createTopicTo(master1With3Replicas, topic, 1, 1);
+        createTopicTo(master2With3Replicas, topic, 1, 1);
+        createTopicTo(master3With3Replicas, topic, 1, 1);
+        System.out.println("Topic [" + topic + "] created");
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable {
+        producer = createProducer(PopSlaveActingMasterIT.class.getSimpleName() 
+ "_PRODUCER");
+        producer.setSendMsgTimeout(5000);
+        producer.start();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producer.shutdown();
+    }
+
+
+    @Test
+    public void testLocalActing_ackSlave() throws Exception {
+        String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
+        createTopic(topic);
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        createTopic(retryTopic);
+
+        this.switchPop(topic);
+
+        
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+        MessageQueue messageQueue = new MessageQueue(topic, 
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                System.out.println("send message id: " + 
sendResult.getMsgId());
+                sendSuccess++;
+            }
+        }
+
+        System.out.printf("send success %d%n", sendSuccess);
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= 
MESSAGE_COUNT);
+
+        isolateBroker(master1With3Replicas);
+        System.out.printf("isolate master1%n");
+
+        DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+        consumer.subscribe(topic, "*");
+        
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        List<String> consumedMessages = new CopyOnWriteArrayList<>();
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, 
context) -> {
+            msgs.forEach(msg -> {
+                System.out.println("receive msg id: " + msg.getMsgId());
+                consumedMessages.add(msg.getMsgId());
+            });
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        consumer.setClientRebalance(false);
+        consumer.start();
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> 
consumedMessages.size() >= MESSAGE_COUNT);
+        System.out.printf("%s pop receive msg count: %d%n", 
LocalDateTime.now(), consumedMessages.size());
+
+        consumer.shutdown();
+
+        List<String> retryMsgList = new CopyOnWriteArrayList<>();
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(retryTopic, "*");
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            for (MessageExt msg : msgs) {
+                System.out.printf("receive retry msg: %s %s%n", new 
String(msg.getBody()), msg);
+                retryMsgList.add(new String(msg.getBody()));
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        System.out.printf("wait for ack revive%n");
+        Thread.sleep(10000L);
+
+        assertThat(retryMsgList.size()).isEqualTo(0);
+
+        cancelIsolatedBroker(master1With3Replicas);
+        awaitUntilSlaveOK();
+
+        pushConsumer.shutdown();
+    }
+
+    @Test
+    public void testLocalActing_notAckSlave() throws Exception {
+//        master1With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
+//        master1With3Replicas.getBrokerConfig().setReviveInterval(0L);
+//        
//master1With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s 
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
+//
+//        master2With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
+//        master2With3Replicas.getBrokerConfig().setReviveInterval(0L);
+//        
//master2With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s 
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
+//
+//        master3With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
+//        master3With3Replicas.getBrokerConfig().setReviveInterval(0L);
+//        
//master3With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s 
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
+
+        String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
+        createTopic(topic);
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        createTopic(retryTopic);
+
+        this.switchPop(topic);
+
+        
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+        Set<String> sendToIsolateMsgSet = new HashSet<>();
+        MessageQueue messageQueue = new MessageQueue(topic, 
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendToIsolateMsgSet.add(new String(msg.getBody()));
+                sendSuccess++;
+            }
+        }
+
+        System.out.printf("send success %d%n", sendSuccess);
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= 
MESSAGE_COUNT);
+
+        isolateBroker(master1With3Replicas);
+        System.out.printf("isolate master1%n");
+
+        DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+        consumer.subscribe(topic, "*");
+        
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.setPopInvisibleTime(5000L);
+        List<String> consumedMessages = new CopyOnWriteArrayList<>();
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, 
context) -> {
+            msgs.forEach(msg -> {
+                System.out.println("receive msg id: " + msg.getMsgId());
+
+                msg.setReconsumeTimes(0);
+
+                consumedMessages.add(msg.getMsgId());
+            });
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+        });
+        consumer.setClientRebalance(false);
+        consumer.start();
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> 
consumedMessages.size() >= MESSAGE_COUNT);
+        consumer.shutdown();
+
+        List<String> retryMsgList = new CopyOnWriteArrayList<>();
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(retryTopic, "*");
+        
pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            for (MessageExt msg : msgs) {
+                System.out.printf("receive retry msg: %s%n", 
msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+                retryMsgList.add(new String(msg.getBody()));
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        System.out.printf(LocalDateTime.now() + ": wait for ack revive%n");
+
+        AtomicInteger failCnt = new AtomicInteger(0);
+        
await().atMost(Duration.ofMinutes(3)).pollInterval(Duration.ofSeconds(10)).until(()
 -> {
+            if (retryMsgList.size() < MESSAGE_COUNT) {
+                System.out.println("check FAILED" + failCnt.incrementAndGet() 
+ ": retryMsgList.size=" + retryMsgList.size() + " less than " + MESSAGE_COUNT);
+                return false;
+            }
+
+            for (String msgBodyString : retryMsgList) {
+                if (!sendToIsolateMsgSet.contains(msgBodyString)) {
+                    System.out.println("check FAILED: sendToIsolateMsgSet 
doesn't contain " + msgBodyString);
+                    return false;
+                }
+            }
+            return true;
+        });
+
+        System.out.printf(LocalDateTime.now() + ": receive retry msg 
size=%d%n", retryMsgList.size());
+
+        cancelIsolatedBroker(master1With3Replicas);
+        awaitUntilSlaveOK();
+
+        pushConsumer.shutdown();
+    }
+
+    @Test
+    public void testRemoteActing_ackSlave() throws Exception {
+        String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
+        createTopic(topic);
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        createTopic(retryTopic);
+
+        switchPop(topic);
+
+        
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+        MessageQueue messageQueue = new MessageQueue(topic, 
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                System.out.println("Send message id: " + 
sendResult.getMsgId());
+                sendSuccess++;
+            }
+        }
+
+        System.out.printf("%s send success %d%n", LocalDateTime.now(), 
sendSuccess);
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= 
MESSAGE_COUNT);
+
+        isolateBroker(master1With3Replicas);
+        System.out.printf("%s isolate master1%n", LocalDateTime.now());
+
+        isolateBroker(master2With3Replicas);
+        brokerContainer2.removeBroker(new BrokerIdentity(
+                master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+                master2With3Replicas.getBrokerConfig().getBrokerName(),
+                master2With3Replicas.getBrokerConfig().getBrokerId()));
+        System.out.printf("%s Remove master2%n", LocalDateTime.now());
+
+
+        DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+        consumer.subscribe(topic, "*");
+        
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        List<String> consumedMessages = new CopyOnWriteArrayList<>();
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, 
context) -> {
+            msgs.forEach(msg -> {
+                System.out.println("receive msg id: " + msg.getMsgId());
+                consumedMessages.add(msg.getMsgId());
+            });
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        consumer.setClientRebalance(false);
+        consumer.start();
+
+        await().atMost(Duration.ofMinutes(2)).until(() -> 
consumedMessages.size() >= MESSAGE_COUNT);
+        consumer.shutdown();
+        System.out.printf("%s %d messages consumed%n", LocalDateTime.now(), 
consumedMessages.size());
+
+        List<String> retryMsgList = new CopyOnWriteArrayList<>();
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(retryTopic, "*");
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            for (MessageExt msg : msgs) {
+                System.out.printf("receive retry msg: %s %n", 
msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+                retryMsgList.add(new String(msg.getBody()));
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        System.out.printf("%s wait for ack revive%n", LocalDateTime.now());
+        Thread.sleep(10000);
+
+        assertThat(retryMsgList.size()).isEqualTo(0);
+
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("%s Cancel isolate master1%n", LocalDateTime.now());
+
+        //Add back master
+        master2With3Replicas = 
brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), 
master2With3Replicas.getMessageStoreConfig());
+        master2With3Replicas.start();
+        cancelIsolatedBroker(master2With3Replicas);
+        System.out.printf("%s Add back master2%n", LocalDateTime.now());
+
+        awaitUntilSlaveOK();
+
+        System.out.printf("%s wait for ack revive%n", LocalDateTime.now());
+        Thread.sleep(10000);
+
+        assertThat(retryMsgList.size()).isEqualTo(0);
+
+        System.out.printf("%s shutting down pushConsumer%n", 
LocalDateTime.now());
+        pushConsumer.shutdown();
+    }
+
+    @Test
+    public void testRemoteActing_notAckSlave_getFromLocal() throws Exception {
+        String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
+        createTopic(topic);
+        this.switchPop(topic);
+
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        createTopic(retryTopic);
+
+        
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+        Set<String> sendToIsolateMsgSet = new HashSet<>();
+        MessageQueue messageQueue = new MessageQueue(topic, 
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendToIsolateMsgSet.add(new String(msg.getBody()));
+                sendSuccess++;
+            }
+        }
+
+        System.out.printf("send success %d%n", sendSuccess);
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= 
MESSAGE_COUNT);
+
+        isolateBroker(master1With3Replicas);
+        System.out.printf("isolate master1%n");
+
+        isolateBroker(master2With3Replicas);
+        brokerContainer2.removeBroker(new BrokerIdentity(
+                master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+                master2With3Replicas.getBrokerConfig().getBrokerName(),
+                master2With3Replicas.getBrokerConfig().getBrokerId()));
+        System.out.printf("Remove master2%n");
+
+
+        DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+        consumer.subscribe(topic, "*");
+        
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        List<String> consumedMessages = new CopyOnWriteArrayList<>();
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, 
context) -> {
+            msgs.forEach(msg -> {
+                System.out.println("receive msg id: " + msg.getMsgId());
+                consumedMessages.add(msg.getMsgId());
+            });
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+        });
+        consumer.setClientRebalance(false);
+        consumer.start();
+
+        await().atMost(Duration.ofMinutes(3)).until(() -> 
consumedMessages.size() >= MESSAGE_COUNT);
+        consumer.shutdown();
+
+
+        List<String> retryMsgList = new CopyOnWriteArrayList<>();
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(retryTopic, "*");
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            for (MessageExt msg : msgs) {
+                System.out.printf("receive retry msg: %s%n", 
msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+                retryMsgList.add(new String(msg.getBody()));
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        System.out.printf("wait for ack revive%n");
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            if (retryMsgList.size() < MESSAGE_COUNT) {
+                System.out.println("check FAILED: retryMsgList.size=" + 
retryMsgList.size() + " less than " + MESSAGE_COUNT);
+                return false;
+            }
+
+            for (String msgBodyString : retryMsgList) {
+                if (!sendToIsolateMsgSet.contains(msgBodyString)) {
+                    System.out.println("check FAILED: sendToIsolateMsgSet 
doesn't contain: " + msgBodyString);
+                    return false;
+                }
+            }
+            return true;
+        });
+
+        System.out.printf("receive retry msg as expected%n");
+
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("Cancel isolate master1%n");
+
+        //Add back master
+        master2With3Replicas = 
brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), 
master2With3Replicas.getMessageStoreConfig());
+        master2With3Replicas.start();
+        cancelIsolatedBroker(master2With3Replicas);
+        System.out.printf("Add back master2%n");
+
+        awaitUntilSlaveOK();
+        pushConsumer.shutdown();
+    }
+
+    @Test
+    public void testRemoteActing_notAckSlave_getFromRemote() throws Exception {
+        String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
+        createTopic(topic);
+        this.switchPop(topic);
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        createTopic(retryTopic);
+
+        
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
+        Set<String> sendToIsolateMsgSet = new HashSet<>();
+        MessageQueue messageQueue = new MessageQueue(topic, 
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendToIsolateMsgSet.add(new String(msg.getBody()));
+                sendSuccess++;
+            }
+        }
+
+        System.out.printf("send success %d%n", sendSuccess);
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= 
MESSAGE_COUNT);
+
+        isolateBroker(master1With3Replicas);
+        System.out.printf("isolate master1%n");
+
+        isolateBroker(master2With3Replicas);
+        brokerContainer2.removeBroker(new BrokerIdentity(
+                master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+                master2With3Replicas.getBrokerConfig().getBrokerName(),
+                master2With3Replicas.getBrokerConfig().getBrokerId()));
+        System.out.printf("Remove master2%n");
+
+        BrokerController slave1InBrokerContainer3 = 
getSlaveFromContainerByName(brokerContainer3, 
master1With3Replicas.getBrokerConfig().getBrokerName());
+        isolateBroker(slave1InBrokerContainer3);
+        brokerContainer3.removeBroker(new BrokerIdentity(
+                
slave1InBrokerContainer3.getBrokerConfig().getBrokerClusterName(),
+                slave1InBrokerContainer3.getBrokerConfig().getBrokerName(),
+                slave1InBrokerContainer3.getBrokerConfig().getBrokerId()));
+        System.out.printf("Remove slave1 form container3%n");
+
+        DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP);
+        consumer.subscribe(topic, "*");
+        
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        List<String> consumedMessages = new CopyOnWriteArrayList<>();
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, 
context) -> {
+            msgs.forEach(msg -> {
+                System.out.println("receive msg id: " + msg.getMsgId());
+                consumedMessages.add(msg.getMsgId());
+            });
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+        });
+        consumer.setClientRebalance(false);
+        consumer.start();
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> 
consumedMessages.size() >= MESSAGE_COUNT);
+        System.out.printf("%s pop receive msg count: %d%n", 
LocalDateTime.now(), consumedMessages.size());
+        consumer.shutdown();
+
+
+        List<String> retryMsgList = new CopyOnWriteArrayList<>();
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(retryTopic, "*");
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            for (MessageExt msg : msgs) {
+                System.out.printf("receive retry msg: %s%n", 
msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+                retryMsgList.add(new String(msg.getBody()));
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        System.out.printf("wait for ack revive%n");
+        Thread.sleep(10000);
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            if (retryMsgList.size() < MESSAGE_COUNT) {
+                return false;
+            }
+
+            for (String msgBodyString : retryMsgList) {
+                if (!sendToIsolateMsgSet.contains(msgBodyString)) {
+                    return false;
+                }
+            }
+            return true;
+        });
+
+        System.out.printf("receive retry msg as expected%n");
+
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("Cancel isolate master1%n");
+
+        //Add back master
+        master2With3Replicas = 
brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), 
master2With3Replicas.getMessageStoreConfig());
+        master2With3Replicas.start();
+        cancelIsolatedBroker(master2With3Replicas);
+        System.out.printf("Add back master2%n");
+
+        //Add back slave1 to container3
+        slave1InBrokerContainer3 = 
brokerContainer3.addBroker(slave1InBrokerContainer3.getBrokerConfig(), 
slave1InBrokerContainer3.getMessageStoreConfig());
+        slave1InBrokerContainer3.start();
+        cancelIsolatedBroker(slave1InBrokerContainer3);
+        System.out.printf("Add back slave1 to container3%n");
+
+        awaitUntilSlaveOK();
+        pushConsumer.shutdown();
+    }
+
+    private void switchPop(String topic) throws Exception {
+        for (BrokerContainer brokerContainer : brokerContainerList) {
+            for (InnerBrokerController master : 
brokerContainer.getMasterBrokers()) {
+                String brokerAddr = master.getBrokerAddr();
+                defaultMQAdminExt.setMessageRequestMode(brokerAddr, topic, 
CONSUME_GROUP, MessageRequestMode.POP, 8, 60_000);
+            }
+            for (InnerSalveBrokerController slave : 
brokerContainer.getSlaveBrokers()) {
+                defaultMQAdminExt.setMessageRequestMode(slave.getBrokerAddr(), 
topic, CONSUME_GROUP, MessageRequestMode.POP, 8, 60_000);
+            }
+        }
+
+    }
+
+}
\ No newline at end of file

Reply via email to