This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new e93536a Add IT Test for static topic
e93536a is described below
commit e93536a9aeb42f9695767fe28d43b1a229265a67
Author: dongeforever <[email protected]>
AuthorDate: Tue Nov 23 20:21:28 2021 +0800
Add IT Test for static topic
---
.../org/apache/rocketmq/test/base/BaseConf.java | 16 +
.../apache/rocketmq/test/smoke/LogicalQueueIT.java | 1170 --------------------
.../apache/rocketmq/test/smoke/StaticTopicIT.java | 76 ++
.../tools/admin/DefaultMQAdminExtTest.java | 8 +-
4 files changed, 93 insertions(+), 1177 deletions(-)
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 79469e1..2d59f31 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -19,8 +19,10 @@ package org.apache.rocketmq.test.base;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -120,6 +122,13 @@ public class BaseConf {
return group;
}
+ public static DefaultMQAdminExt getAdmin(String nsAddr) {
+ final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
+ mqAdminExt.setNamesrvAddr(nsAddr);
+ mqClients.add(mqAdminExt);
+ return mqAdminExt;
+ }
+
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
return getProducer(nsAddr, topic, false);
}
@@ -197,6 +206,13 @@ public class BaseConf {
shutdown(mqClients);
}
+ public static Set<String> getBrokers() {
+ Set<String> brokers = new HashSet<>();
+ brokers.add(broker1Name);
+ brokers.add(broker2Name);
+ return brokers;
+ }
+
public static void shutdown(List<Object> mqClients) {
mqClients.forEach(mqClient -> ForkJoinPool.commonPool().execute(() -> {
if (mqClient instanceof AbstractMQProducer) {
diff --git
a/test/src/test/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java
b/test/src/test/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java
deleted file mode 100644
index b1e9013..0000000
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/LogicalQueueIT.java
+++ /dev/null
@@ -1,1170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.test.smoke;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.PullCallback;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.consumer.PullStatus;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendResultForLogicalQueue;
-import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.store.CommitLog;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MappedFileQueue;
-import org.apache.rocketmq.test.base.BaseConf;
-import org.apache.rocketmq.test.base.IntegrationTestBase;
-import org.apache.rocketmq.test.util.MQRandomUtils;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
-import
org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand;
-import
org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
-import
org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Optional.ofNullable;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.awaitility.Awaitility.await;
-import static org.awaitility.Awaitility.waitAtMost;
-
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class LogicalQueueIT {
- private static final Logger logger =
LoggerFactory.getLogger(LogicalQueueIT.class);
- public static String nsAddr;
- private static String broker1Name;
- private static String broker2Name;
- private static String clusterName;
- private static int brokerNum;
- private final static int QUEUE_NUMBERS = 8;
- private static NamesrvController namesrvController;
- private static BrokerController brokerController1;
- private static BrokerController brokerController2;
- private static Map<String, BrokerController> brokerControllerMap;
- private final static List<Object> mqClients = new ArrayList<>();
-
- private static DefaultMQProducer producer;
- private static DefaultMQPullConsumer consumer;
- private static DefaultMQAdminExt mqAdminExt;
- private static volatile String topic = null;
- private static final String placeholderTopic = "placeholder";
- private static final int MSG_SENT_TIMES = 3;
- private static final int COMMIT_LOG_FILE_SIZE = 512 * 1024;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY,
Integer.toString(MQVersion.CURRENT_VERSION));
- namesrvController = IntegrationTestBase.createAndStartNamesrv();
- nsAddr = "127.0.0.1:" +
namesrvController.getNettyServerConfig().getListenPort();
-
- int oldCommitLogSize = IntegrationTestBase.COMMIT_LOG_SIZE;
- IntegrationTestBase.COMMIT_LOG_SIZE = COMMIT_LOG_FILE_SIZE;
- brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
- brokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr);
- IntegrationTestBase.COMMIT_LOG_SIZE = oldCommitLogSize;
-
- clusterName =
brokerController1.getBrokerConfig().getBrokerClusterName();
- broker1Name = brokerController1.getBrokerConfig().getBrokerName();
- broker2Name = brokerController2.getBrokerConfig().getBrokerName();
- brokerNum = 2;
- brokerControllerMap = ImmutableList.of(brokerController1,
brokerController2).stream().collect(Collectors.toMap(input ->
input.getBrokerConfig().getBrokerName(), Function.identity()));
-
- BaseConf.waitBrokerRegistered(nsAddr, clusterName);
-
- producer = new
DefaultMQProducer(MQRandomUtils.getRandomConsumerGroup());
- mqClients.add(producer);
- producer.setNamesrvAddr(nsAddr);
- producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
- producer.setSendMsgTimeout(1000);
- producer.start();
-
- consumer = new DefaultMQPullConsumer(BaseConf.initConsumerGroup());
- mqClients.add(consumer);
- consumer.setNamesrvAddr(nsAddr);
- consumer.setConsumerPullTimeoutMillis(1000);
- consumer.start();
-
- mqAdminExt = new DefaultMQAdminExt(1000);
- mqClients.add(mqAdminExt);
- mqAdminExt.setNamesrvAddr(nsAddr);
- mqAdminExt.start();
-
- mqAdminExt.createTopic(clusterName, placeholderTopic, 1);
- }
-
- @AfterClass
- public static void afterClass() {
- BaseConf.shutdown(mqClients);
- brokerControllerMap.forEach((s, brokerController) ->
brokerController.shutdown());
- ofNullable(namesrvController).ifPresent(obj ->
ForkJoinPool.commonPool().execute(obj::shutdown));
- }
-
- @Before
- public void setUp() throws Exception {
- topic = "tt-" + MQRandomUtils.getRandomTopic();
- logger.info("use topic: {}", topic);
- mqAdminExt.createTopic(clusterName, topic, QUEUE_NUMBERS);
-
assertThat(mqAdminExt.examineTopicRouteInfo(topic).getBrokerDatas()).hasSize(brokerNum);
- await().atMost(5, TimeUnit.SECONDS).until(() ->
!mqAdminExt.examineTopicStats(topic).getOffsetTable().isEmpty());
-
- consumer.setRegisterTopics(Collections.singleton(topic));
- // consumer.setMessageQueueListener &
consumer.registerMessageQueueListener are useless in DefaultMQPullConsumer,
they will never work, so do not need to test it
-
- new UpdateTopicLogicalQueueMappingCommand().execute(mqAdminExt, topic,
brokerControllerMap.values().stream().map(BrokerController::getBrokerAddr).collect(Collectors.toSet()));
- }
-
- private static String getCurrentMethodName() {
- // 0: getStackTrace
- // 1: getCurrentMethodName
- // 2: __realMethod__
- return Thread.currentThread().getStackTrace()[2].getMethodName();
- }
-
- @Test
- public void test001_SendPullSync() throws Exception {
- String methodName = getCurrentMethodName();
-
- List<MessageQueue> publishMessageQueues =
producer.fetchPublishMessageQueues(topic);
- assertThat(publishMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS);
- Set<Integer> queueIds = IntStream.range(0, brokerNum *
QUEUE_NUMBERS).boxed().collect(Collectors.toSet());
- for (MessageQueue messageQueue : publishMessageQueues) {
-
assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
- assertThat(queueIds.remove(messageQueue.getQueueId())).isTrue();
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- SendResult sendResult = producer.send(new Message(topic,
String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName,
messageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), messageQueue);
-
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
-
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
- }
- }
- assertThat(queueIds).isEmpty();
-
- List<MessageQueue> subscribeMessageQueues =
consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
- assertThat(subscribeMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS);
-
subscribeMessageQueues.sort(Comparator.comparingInt(MessageQueue::getQueueId));
- queueIds.addAll(IntStream.range(0, brokerNum *
QUEUE_NUMBERS).boxed().collect(Collectors.toSet()));
- for (MessageQueue messageQueue : subscribeMessageQueues) {
-
assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
- assertThat(queueIds.remove(messageQueue.getQueueId())).isTrue();
- long offset = mqAdminExt.minOffset(messageQueue);
- PullResult pullResult = consumer.pull(messageQueue, "*", offset,
10);
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
- offset = -1;
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- MessageExt msg = pullResult.getMsgFoundList().get(i);
-
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(msg.getQueueId()).isEqualTo(messageQueue.getQueueId());
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH,
"%s-sync-%d-%d", methodName, messageQueue.getQueueId(), i));
- if (i > 0) {
- assertThat(msg.getQueueOffset()).isEqualTo(offset + i);
- } else {
- offset = msg.getQueueOffset();
- }
- }
- assertThat(maxOffsetUncommitted(messageQueue)).isEqualTo(offset +
MSG_SENT_TIMES);
- }
- assertThat(queueIds).isEmpty();
- }
-
- @Test
- public void test002_SendPullAsync() throws Exception {
- String methodName = getCurrentMethodName();
-
- List<MessageQueue> publishMessageQueues =
producer.fetchPublishMessageQueues(topic);
- for (MessageQueue messageQueue : publishMessageQueues) {
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- CompletableFuture<SendResult> future = new
CompletableFuture<>();
- producer.send(new Message(topic, String.format(Locale.ENGLISH,
"%s-async-%d-%d", methodName, messageQueue.getQueueId(),
i).getBytes(StandardCharsets.UTF_8)), messageQueue, new SendCallback() {
- @Override public void onSuccess(SendResult sendResult) {
- future.complete(sendResult);
- }
-
- @Override public void onException(Throwable e) {
- future.completeExceptionally(e);
- }
- });
- SendResult sendResult = future.get();
-
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
-
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
- }
- }
-
- List<MessageQueue> subscribeMessageQueues =
consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
- for (MessageQueue messageQueue : subscribeMessageQueues) {
- long offset = mqAdminExt.minOffset(messageQueue);
- CompletableFuture<PullResult> future = new CompletableFuture<>();
- consumer.pull(messageQueue, "*", offset, 10, new PullCallback() {
- @Override public void onSuccess(PullResult pullResult) {
- future.complete(pullResult);
- }
-
- @Override public void onException(Throwable e) {
- future.completeExceptionally(e);
- }
- });
- PullResult pullResult = future.get();
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
- offset = -1;
- Iterator<MessageExt> it = pullResult.getMsgFoundList().iterator();
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- MessageExt msg = it.next();
-
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(msg.getQueueId()).isEqualTo(messageQueue.getQueueId());
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH,
"%s-async-%d-%d", methodName, messageQueue.getQueueId(), i));
- if (i > 0) {
- assertThat(msg.getQueueOffset()).isEqualTo(offset + i);
- } else {
- offset = msg.getQueueOffset();
- }
- }
- }
- }
-
- @Test
- public void test003_MigrateOnceWithoutData() throws Exception {
- final String methodName = getCurrentMethodName();
-
- final int logicalQueueIdx = 1;
-
- TopicRouteData topicRouteInfo =
mqAdminExt.examineTopicRouteInfo(topic);
- List<LogicalQueueRouteData> logicalQueueRouteDataList1 =
topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
- LogicalQueueRouteData lastLogicalQueueRouteData1 =
logicalQueueRouteDataList1.get(logicalQueueRouteDataList1.size() - 1);
- String newBrokerName;
- if (lastLogicalQueueRouteData1.getBrokerName().equals(broker1Name)) {
- newBrokerName = broker2Name;
- } else {
- newBrokerName = broker1Name;
- }
-
- MessageQueue migratedMessageQueue = new MessageQueue(topic,
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx);
-
- new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic,
logicalQueueIdx, newBrokerName, null);
-
- topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
- assertThat(topicRouteInfo.getLogicalQueuesInfo()).isNotNull();
- for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry :
topicRouteInfo.getLogicalQueuesInfo().entrySet()) {
- List<LogicalQueueRouteData> logicalQueueRouteDataList2 =
entry.getValue();
- if (entry.getKey() == logicalQueueIdx) {
-
assertThat(logicalQueueRouteDataList2).hasSize(logicalQueueRouteDataList1.size()
+ 1);
- LogicalQueueRouteData lastLogicalQueueRouteData2 =
logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 2);
-
assertThat(lastLogicalQueueRouteData2.getMessageQueue()).isEqualTo(lastLogicalQueueRouteData1.getMessageQueue());
-
assertThat(lastLogicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L);
-
assertThat(lastLogicalQueueRouteData2.getMessagesCount()).isEqualTo(0L);
- assertThat(lastLogicalQueueRouteData2.isWritable()).isFalse();
- assertThat(lastLogicalQueueRouteData2.isReadable()).isFalse();
- assertThat(lastLogicalQueueRouteData2.isExpired()).isTrue();
-
assertThat(lastLogicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(0L);
-
- LogicalQueueRouteData lastLogicalQueueRouteData3 =
logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 1);
-
assertThat(lastLogicalQueueRouteData3.getBrokerName()).isEqualTo(newBrokerName);
-
assertThat(lastLogicalQueueRouteData3.getOffsetMax()).isLessThan(0L);
- assertThat(lastLogicalQueueRouteData3.isWritable()).isTrue();
- assertThat(lastLogicalQueueRouteData3.isReadable()).isTrue();
- assertThat(lastLogicalQueueRouteData3.isExpired()).isFalse();
-
assertThat(lastLogicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(0L);
- } else {
- assertThat(logicalQueueRouteDataList2).hasSize(1);
- LogicalQueueRouteData logicalQueueRouteData =
logicalQueueRouteDataList2.get(0);
-
assertThat(logicalQueueRouteData.getOffsetMax()).isLessThan(0L);
- assertThat(logicalQueueRouteData.isWritable()).isTrue();
- assertThat(logicalQueueRouteData.isReadable()).isTrue();
- assertThat(logicalQueueRouteData.isExpired()).isFalse();
-
assertThat(logicalQueueRouteData.getLogicalQueueDelta()).isEqualTo(0L);
- }
- }
-
- List<MessageQueue> subscribeMessageQueues =
consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
- assertThat(subscribeMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS);
- for (MessageQueue mq : subscribeMessageQueues) {
- assertThat(mqAdminExt.minOffset(mq)).isEqualTo(0L);
- }
-
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- SendResult sendResult = producer.send(new Message(topic,
String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName,
migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)),
migratedMessageQueue);
-
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName());
-
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
-
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(newBrokerName);
- assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
- }
-
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- CompletableFuture<SendResult> future = new CompletableFuture<>();
- producer.send(new Message(topic, String.format(Locale.ENGLISH,
"%s-async-%d-%d", methodName, migratedMessageQueue.getQueueId(),
i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue, new SendCallback() {
- @Override public void onSuccess(SendResult sendResult) {
- future.complete(sendResult);
- }
-
- @Override public void onException(Throwable e) {
- future.completeExceptionally(e);
- }
- });
- SendResult sendResult = future.get();
-
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName());
-
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
-
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(newBrokerName);
- assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
- }
-
- assertThat(maxOffsetUncommitted(migratedMessageQueue)).isEqualTo(2 *
MSG_SENT_TIMES);
-
- waitAtMost(5, TimeUnit.SECONDS).until(() ->
mqAdminExt.maxOffset(migratedMessageQueue) == 2 * MSG_SENT_TIMES);
-
- PullResult pullResult = consumer.pull(migratedMessageQueue, "*", 0L, 2
* MSG_SENT_TIMES);
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getMinOffset()).isEqualTo(0);
- assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES);
- assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 *
MSG_SENT_TIMES);
- List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
- assertThat(msgFoundList).hasSize(2 * MSG_SENT_TIMES);
- Iterator<MessageExt> it = pullResult.getMsgFoundList().iterator();
- long offset = 0L;
- for (String prefix : new String[] {"sync", "async"}) {
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- MessageExt msg = it.next();
-
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%s-%d-%d",
methodName, prefix, migratedMessageQueue.getQueueId(), i));
- assertThat(msg.getQueueOffset()).isEqualTo(offset);
- offset++;
- }
- }
-
- offset = pullResult.getNextBeginOffset();
- CompletableFuture<PullResult> future = new CompletableFuture<>();
- consumer.pull(migratedMessageQueue, "*", offset, 10, new
PullCallback() {
- @Override public void onSuccess(PullResult pullResult) {
- future.complete(pullResult);
- }
-
- @Override public void onException(Throwable e) {
- future.completeExceptionally(e);
- }
- });
- pullResult = future.get();
-
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
- assertThat(pullResult.getMinOffset()).isEqualTo(0);
- assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES);
- assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 *
MSG_SENT_TIMES);
- assertThat(pullResult.getMsgFoundList()).isNull();
- }
-
- @Test
- public void test004_MigrateOnceWithData() throws Exception {
- final String methodName = getCurrentMethodName();
-
- final int logicalQueueIdx = 1;
-
- TopicRouteData topicRouteInfo =
mqAdminExt.examineTopicRouteInfo(topic);
- List<LogicalQueueRouteData> logicalQueueRouteDataList1 =
topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
- LogicalQueueRouteData lastLogicalQueueRouteData1 =
logicalQueueRouteDataList1.get(logicalQueueRouteDataList1.size() - 1);
- String newBrokerName;
- if (lastLogicalQueueRouteData1.getBrokerName().equals(broker1Name)) {
- newBrokerName = broker2Name;
- } else {
- newBrokerName = broker1Name;
- }
-
- MessageQueue migratedMessageQueue = new MessageQueue(topic,
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx);
-
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- SendResult sendResult = producer.send(new Message(topic,
String.format(Locale.ENGLISH, "%s-sync-%d-%d", methodName,
migratedMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)),
migratedMessageQueue);
-
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName());
-
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
- }
-
assertThat(maxOffsetUncommitted(migratedMessageQueue)).isEqualTo(MSG_SENT_TIMES);
-
- waitAtMost(5, TimeUnit.SECONDS).until(() ->
mqAdminExt.maxOffset(migratedMessageQueue) == MSG_SENT_TIMES);
-
- {
- long offset = 0L;
- PullResult pullResult = consumer.pull(migratedMessageQueue, "*",
offset, 2 * MSG_SENT_TIMES);
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getMinOffset()).isEqualTo(0);
- assertThat(pullResult.getMaxOffset()).isEqualTo(MSG_SENT_TIMES);
-
assertThat(pullResult.getNextBeginOffset()).isEqualTo(MSG_SENT_TIMES);
- List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
- assertThat(msgFoundList).hasSize(MSG_SENT_TIMES);
- Iterator<MessageExt> it = pullResult.getMsgFoundList().iterator();
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- MessageExt msg = it.next();
-
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH,
"%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i));
- assertThat(msg.getQueueOffset()).isEqualTo(offset);
- offset++;
- }
- }
-
- new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic,
logicalQueueIdx, newBrokerName, null);
-
- topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
- assertThat(topicRouteInfo.getLogicalQueuesInfo()).isNotNull();
- for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry :
topicRouteInfo.getLogicalQueuesInfo().entrySet()) {
- List<LogicalQueueRouteData> logicalQueueRouteDataList2 =
entry.getValue();
- if (entry.getKey() == logicalQueueIdx) {
-
assertThat(logicalQueueRouteDataList2).hasSize(logicalQueueRouteDataList1.size()
+ 1);
- LogicalQueueRouteData lastLogicalQueueRouteData2 =
logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 2);
-
assertThat(lastLogicalQueueRouteData2.getMessageQueue()).isEqualTo(lastLogicalQueueRouteData1.getMessageQueue());
-
assertThat(lastLogicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L);
-
assertThat(lastLogicalQueueRouteData2.getMessagesCount()).isEqualTo(MSG_SENT_TIMES);
- assertThat(lastLogicalQueueRouteData2.isWritable()).isFalse();
- assertThat(lastLogicalQueueRouteData2.isReadable()).isTrue();
- assertThat(lastLogicalQueueRouteData2.isExpired()).isFalse();
-
assertThat(lastLogicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(0L);
-
- LogicalQueueRouteData lastLogicalQueueRouteData3 =
logicalQueueRouteDataList2.get(logicalQueueRouteDataList2.size() - 1);
-
assertThat(lastLogicalQueueRouteData3.getBrokerName()).isEqualTo(newBrokerName);
-
assertThat(lastLogicalQueueRouteData3.getOffsetMax()).isLessThan(0L);
- assertThat(lastLogicalQueueRouteData3.isWritable()).isTrue();
- assertThat(lastLogicalQueueRouteData3.isReadable()).isTrue();
- assertThat(lastLogicalQueueRouteData3.isExpired()).isFalse();
-
assertThat(lastLogicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(MSG_SENT_TIMES);
- } else {
- assertThat(logicalQueueRouteDataList2).hasSize(1);
- LogicalQueueRouteData logicalQueueRouteData =
logicalQueueRouteDataList2.get(0);
-
assertThat(logicalQueueRouteData.getOffsetMax()).isLessThan(0L);
- assertThat(logicalQueueRouteData.isWritable()).isTrue();
- assertThat(logicalQueueRouteData.isReadable()).isTrue();
- assertThat(logicalQueueRouteData.isExpired()).isFalse();
-
assertThat(logicalQueueRouteData.getLogicalQueueDelta()).isEqualTo(0L);
- }
- }
- assertThat(migratedMessageQueue).isNotNull();
-
- List<MessageQueue> subscribeMessageQueues =
consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
- assertThat(subscribeMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS);
- for (MessageQueue mq : subscribeMessageQueues) {
- assertThat(mqAdminExt.minOffset(mq)).isEqualTo(0L);
- }
-
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- CompletableFuture<SendResult> future = new CompletableFuture<>();
- producer.send(new Message(topic, String.format(Locale.ENGLISH,
"%s-async-%d-%d", methodName, migratedMessageQueue.getQueueId(),
i).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue, new SendCallback() {
- @Override public void onSuccess(SendResult sendResult) {
- future.complete(sendResult);
- }
-
- @Override public void onException(Throwable e) {
- future.completeExceptionally(e);
- }
- });
- SendResult sendResult = future.get();
-
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migratedMessageQueue.getBrokerName());
-
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
-
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(newBrokerName);
- assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
- }
-
- assertThat(maxOffsetUncommitted(migratedMessageQueue)).isEqualTo(2 *
MSG_SENT_TIMES);
-
- waitAtMost(5, TimeUnit.SECONDS).until(() ->
mqAdminExt.maxOffset(migratedMessageQueue) == 2 * MSG_SENT_TIMES);
-
- long offset = 0L;
- PullResult pullResult = consumer.pull(migratedMessageQueue, "*",
offset, 2 * MSG_SENT_TIMES);
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getMinOffset()).isEqualTo(0);
- assertThat(pullResult.getMaxOffset()).isEqualTo(MSG_SENT_TIMES);
- assertThat(pullResult.getNextBeginOffset()).isEqualTo(MSG_SENT_TIMES);
- List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
- assertThat(msgFoundList).hasSize(MSG_SENT_TIMES);
- Iterator<MessageExt> it = pullResult.getMsgFoundList().iterator();
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- MessageExt msg = it.next();
-
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH,
"%s-sync-%d-%d", methodName, migratedMessageQueue.getQueueId(), i));
- assertThat(msg.getQueueOffset()).isEqualTo(offset);
- offset++;
- }
-
- offset = pullResult.getNextBeginOffset();
- CompletableFuture<PullResult> pullResultFuture = new
CompletableFuture<>();
- consumer.pull(migratedMessageQueue, "*", offset, 2 * MSG_SENT_TIMES,
new PullCallback() {
- @Override public void onSuccess(PullResult pullResult) {
- pullResultFuture.complete(pullResult);
- }
-
- @Override public void onException(Throwable e) {
- pullResultFuture.completeExceptionally(e);
- }
- });
- pullResult = pullResultFuture.get();
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getMinOffset()).isEqualTo(MSG_SENT_TIMES);
- assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES);
- assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 *
MSG_SENT_TIMES);
- msgFoundList = pullResult.getMsgFoundList();
- assertThat(msgFoundList).hasSize(MSG_SENT_TIMES);
- it = pullResult.getMsgFoundList().iterator();
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- MessageExt msg = it.next();
-
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(msg.getQueueId()).isEqualTo(migratedMessageQueue.getQueueId());
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH,
"%s-async-%d-%d", methodName, migratedMessageQueue.getQueueId(), i));
- assertThat(msg.getQueueOffset()).isEqualTo(offset);
- offset++;
- }
-
- offset = pullResult.getNextBeginOffset();
- pullResult = consumer.pull(migratedMessageQueue, "*", offset, 10);
-
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
- assertThat(pullResult.getMinOffset()).isEqualTo(MSG_SENT_TIMES);
- assertThat(pullResult.getMaxOffset()).isEqualTo(2 * MSG_SENT_TIMES);
- assertThat(pullResult.getNextBeginOffset()).isEqualTo(2 *
MSG_SENT_TIMES);
- assertThat(pullResult.getMsgFoundList()).isNull();
- }
-
- @Test
- public void test005_MigrateWithDataBackAndForth() throws Exception {
- final String methodName = getCurrentMethodName();
-
- final int logicalQueueIdx = 1;
-
- MessageQueue migratedMessageQueue = new MessageQueue(topic,
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx);
-
- BrokerController brokerController;
-
- TopicRouteData topicRouteInfo =
mqAdminExt.examineTopicRouteInfo(topic);
- LogicalQueueRouteData lastLogicalQueueRouteData;
- {
- List<LogicalQueueRouteData> logicalQueueRouteDataList =
topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
- lastLogicalQueueRouteData =
logicalQueueRouteDataList.get(logicalQueueRouteDataList.size() - 1);
- }
- final String fromBrokerName, toBrokerName, fromBrokerAddr,
toBrokerAddr;
- if (lastLogicalQueueRouteData.getBrokerName().equals(broker1Name)) {
- fromBrokerName = broker1Name;
- fromBrokerAddr = brokerController1.getBrokerAddr();
- toBrokerName = broker2Name;
- toBrokerAddr = brokerController2.getBrokerAddr();
- } else {
- fromBrokerName = broker2Name;
- fromBrokerAddr = brokerController2.getBrokerAddr();
- toBrokerName = broker1Name;
- toBrokerAddr = brokerController1.getBrokerAddr();
- }
-
- int msgIdx = 0;
-
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- SendResult sendResult = producer.send(new Message(topic,
String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx,
msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
-
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(fromBrokerName);
-
assertThat(sendResult2.getOrigQueueId()).isEqualTo(logicalQueueIdx);
- }
-
- rotateBrokerCommitLog(brokerControllerMap.get(fromBrokerName));
-
- new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic,
logicalQueueIdx, toBrokerName, null);
-
- {
- LogicalQueuesInfo info;
- List<LogicalQueueRouteData> logicalQueueRouteDataList;
- info = mqAdminExt.queryTopicLogicalQueueMapping(fromBrokerAddr,
topic);
- logicalQueueRouteDataList = info.get(logicalQueueIdx);
- assertThat(logicalQueueRouteDataList).hasSize(2);
- info = mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr,
topic);
- logicalQueueRouteDataList = info.get(logicalQueueIdx);
- assertThat(logicalQueueRouteDataList).hasSize(1);
- }
-
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- SendResult sendResult = producer.send(new Message(topic,
String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx,
msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
-
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(toBrokerName);
- assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
- }
-
- new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic,
logicalQueueIdx, fromBrokerName, null);
- // now will reuse queue with a ReadOnly one
-
- {
- LogicalQueuesInfo info;
- List<LogicalQueueRouteData> logicalQueueRouteDataList;
- info = mqAdminExt.queryTopicLogicalQueueMapping(fromBrokerAddr,
topic);
- logicalQueueRouteDataList = info.get(logicalQueueIdx);
- assertThat(logicalQueueRouteDataList).hasSize(3);
- info = mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr,
topic);
- logicalQueueRouteDataList = info.get(logicalQueueIdx);
- assertThat(logicalQueueRouteDataList).hasSize(2);
- }
-
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- SendResult sendResult = producer.send(new Message(topic,
String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx,
msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
-
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(fromBrokerName);
-
assertThat(sendResult2.getOrigQueueId()).isEqualTo(logicalQueueIdx);
- }
-
- LogicalQueueRouteData logicalQueueRouteData1;
- LogicalQueueRouteData logicalQueueRouteData2;
- {
- List<LogicalQueueRouteData> logicalQueueRouteDataList;
- topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
- logicalQueueRouteDataList =
topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
- assertThat(logicalQueueRouteDataList).hasSize(3);
- logicalQueueRouteData1 = logicalQueueRouteDataList.get(0);
-
assertThat(logicalQueueRouteData1.getLogicalQueueDelta()).isEqualTo(0);
- assertThat(logicalQueueRouteData1.isReadable()).isTrue();
- assertThat(logicalQueueRouteData1.isWritable()).isFalse();
- assertThat(logicalQueueRouteData1.isExpired()).isFalse();
- assertThat(logicalQueueRouteData1.isWriteOnly()).isFalse();
-
assertThat(logicalQueueRouteData1.getBrokerName()).isEqualTo(fromBrokerName);
-
assertThat(logicalQueueRouteData1.getOffsetMax()).isGreaterThanOrEqualTo(0L);
-
assertThat(logicalQueueRouteData1.getMessagesCount()).isEqualTo(MSG_SENT_TIMES);
-
assertThat(logicalQueueRouteData1.getFirstMsgTimeMillis()).isGreaterThan(0L);
-
assertThat(logicalQueueRouteData1.getLastMsgTimeMillis()).isGreaterThan(0L);
- logicalQueueRouteData2 = logicalQueueRouteDataList.get(1);
-
assertThat(logicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(MSG_SENT_TIMES);
- assertThat(logicalQueueRouteData2.isReadable()).isTrue();
- assertThat(logicalQueueRouteData2.isWritable()).isFalse();
- assertThat(logicalQueueRouteData2.isExpired()).isFalse();
- assertThat(logicalQueueRouteData2.isWriteOnly()).isFalse();
-
assertThat(logicalQueueRouteData2.getBrokerName()).isEqualTo(toBrokerName);
-
assertThat(logicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L);
-
assertThat(logicalQueueRouteData2.getMessagesCount()).isEqualTo(MSG_SENT_TIMES);
-
assertThat(logicalQueueRouteData2.getFirstMsgTimeMillis()).isGreaterThan(0L);
-
assertThat(logicalQueueRouteData2.getLastMsgTimeMillis()).isGreaterThan(0L);
- LogicalQueueRouteData logicalQueueRouteData3 =
logicalQueueRouteDataList.get(2);
-
assertThat(logicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(2 *
MSG_SENT_TIMES);
- assertThat(logicalQueueRouteData3.isReadable()).isTrue();
- assertThat(logicalQueueRouteData3.isWritable()).isTrue();
- assertThat(logicalQueueRouteData3.isExpired()).isFalse();
- assertThat(logicalQueueRouteData3.isWriteOnly()).isFalse();
-
assertThat(logicalQueueRouteData3.getBrokerName()).isEqualTo(fromBrokerName);
- assertThat(logicalQueueRouteData3.getOffsetMax()).isLessThan(0L);
- }
-
- msgIdx = 0;
- forLoop:
- for (long offset = 0L; ; ) {
- PullResult pullResult = consumer.pull(migratedMessageQueue, "*",
offset, 3 * MSG_SENT_TIMES);
- switch (pullResult.getPullStatus()) {
- case NO_NEW_MSG:
- assertThat(offset).isGreaterThanOrEqualTo(3L *
MSG_SENT_TIMES);
- break forLoop;
- case OFFSET_ILLEGAL:
- offset = pullResult.getNextBeginOffset();
- break;
- default:
-
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getMsgFoundList()).isNotNull();
-
assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
- for (MessageExt msg : pullResult.getMsgFoundList()) {
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d",
methodName, logicalQueueIdx, msgIdx));
- msgIdx++;
- assertThat(msg.getQueueOffset()).isEqualTo(offset);
- offset++;
- }
- offset = pullResult.getNextBeginOffset();
- break;
- }
- }
-
- waitAtMost(5, TimeUnit.SECONDS).until(() ->
maxOffsetUncommitted(logicalQueueRouteData1.getMessageQueue()) ==
mqAdminExt.maxOffset(logicalQueueRouteData1.getMessageQueue()));
- waitAtMost(5, TimeUnit.SECONDS).until(() ->
maxOffsetUncommitted(logicalQueueRouteData2.getMessageQueue()) ==
mqAdminExt.maxOffset(logicalQueueRouteData2.getMessageQueue()));
-
- // now verify after commit log cleaned, toBroker's first queue route
data will be expired too
- brokerController =
brokerControllerMap.get(logicalQueueRouteData2.getBrokerName());
- rotateBrokerCommitLog(brokerController);
- deleteCommitLogFiles(brokerController, 1);
-
- {
- topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
- List<LogicalQueueRouteData> logicalQueueRouteDataList =
topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
- assertThat(logicalQueueRouteDataList).hasSize(2);
-
assertThat(logicalQueueRouteDataList.get(0)).isEqualToIgnoringGivenFields(new
LogicalQueueRouteData(logicalQueueIdx, 0, new MessageQueue(topic,
fromBrokerName, logicalQueueIdx), MessageQueueRouteState.ReadOnly, 0, 3, -1,
-1, fromBrokerAddr), "firstMsgTimeMillis", "lastMsgTimeMillis");
-
assertThat(logicalQueueRouteDataList.get(1)).isEqualToComparingFieldByField(new
LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new
MessageQueue(topic, fromBrokerName, logicalQueueIdx),
MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, fromBrokerAddr));
- }
-
- // try pull again, since there is an expired queue route in the middle.
- {
- int msgCount = 0;
- Queue<Integer> wantMsgIdx = new LinkedList<>();
- wantMsgIdx.addAll(IntStream.range(0,
MSG_SENT_TIMES).boxed().collect(Collectors.toList()));
- wantMsgIdx.addAll(IntStream.range(2 * MSG_SENT_TIMES, 3 *
MSG_SENT_TIMES).boxed().collect(Collectors.toList()));
- forLoop:
- for (long offset = mqAdminExt.minOffset(migratedMessageQueue); ; )
{
- PullResult pullResult = consumer.pull(migratedMessageQueue,
"*", offset, 3 * MSG_SENT_TIMES);
- switch (pullResult.getPullStatus()) {
- case NO_NEW_MSG:
- assertThat(msgCount).as("offset=%d",
offset).isEqualTo(2 * MSG_SENT_TIMES);
- break forLoop;
- case OFFSET_ILLEGAL:
- offset = pullResult.getNextBeginOffset();
- break;
- case FOUND:
- msgCount += pullResult.getMsgFoundList().size();
- boolean first = true;
- for (MessageExt msg : pullResult.getMsgFoundList()) {
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).as("offset=%d",
offset).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName,
logicalQueueIdx, wantMsgIdx.poll()));
- if (first) {
-
assertThat(msg.getQueueOffset()).isGreaterThanOrEqualTo(offset);
- first = false;
- } else {
-
assertThat(msg.getQueueOffset()).isGreaterThan(offset);
- }
- offset = msg.getQueueOffset();
- }
- offset = pullResult.getNextBeginOffset();
- break;
- default:
- Assert.fail(String.format(Locale.ENGLISH, "unexpected
pull offset=%d status: %s", offset, pullResult));
- }
- }
- }
-
- // rotate first queue route to expired, and pull it
- brokerController =
brokerControllerMap.get(logicalQueueRouteData1.getBrokerName());
- rotateBrokerCommitLog(brokerController);
- deleteCommitLogFiles(brokerController, 2);
-
- {
- List<LogicalQueueRouteData> logicalQueueRouteDataList;
- topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
- logicalQueueRouteDataList =
topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
-
assertThat(logicalQueueRouteDataList).isEqualTo(Collections.singletonList(new
LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new
MessageQueue(topic, fromBrokerName, logicalQueueIdx),
MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, fromBrokerAddr)));
- }
-
- {
- int msgCount = 0;
- Queue<Integer> wantMsgIdx = new LinkedList<>();
- wantMsgIdx.addAll(IntStream.range(2 * MSG_SENT_TIMES, 3 *
MSG_SENT_TIMES).boxed().collect(Collectors.toList()));
- forLoop:
- for (long offset = mqAdminExt.minOffset(migratedMessageQueue); ; )
{
- PullResult pullResult = consumer.pull(migratedMessageQueue,
"*", offset, 3 * MSG_SENT_TIMES);
- switch (pullResult.getPullStatus()) {
- case NO_NEW_MSG:
- if (msgCount != MSG_SENT_TIMES) {
- Assert.fail(String.format(Locale.ENGLISH, "want %d
msg but got %d", MSG_SENT_TIMES, msgCount));
- }
- break forLoop;
- case OFFSET_ILLEGAL:
- offset = pullResult.getNextBeginOffset();
- break;
- case FOUND:
- msgCount += pullResult.getMsgFoundList().size();
- boolean first = true;
- for (MessageExt msg : pullResult.getMsgFoundList()) {
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).as("offset=%d",
offset).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName,
logicalQueueIdx, wantMsgIdx.poll()));
- if (first) {
-
assertThat(msg.getQueueOffset()).isGreaterThanOrEqualTo(offset);
- first = false;
- } else {
-
assertThat(msg.getQueueOffset()).isGreaterThan(offset);
- }
- offset = msg.getQueueOffset();
- }
- offset = pullResult.getNextBeginOffset();
- break;
- default:
- Assert.fail(String.format(Locale.ENGLISH, "unexpected
pull offset=%d status: %s", offset, pullResult));
- }
- }
- }
-
- brokerController = brokerControllerMap.get(fromBrokerName);
- rotateBrokerCommitLog(brokerController);
- deleteCommitLogFiles(brokerController, 1);
-
- {
- forLoop:
- for (long offset = mqAdminExt.minOffset(migratedMessageQueue); ; )
{
- PullResult pullResult = consumer.pull(migratedMessageQueue,
"*", offset, 3 * MSG_SENT_TIMES);
- // commit log rotate and cleaned, so there is no message.
- switch (pullResult.getPullStatus()) {
- case NO_MATCHED_MSG:
- case NO_NEW_MSG:
-
assertThat(pullResult.getNextBeginOffset()).isEqualTo(3 * MSG_SENT_TIMES);
- break forLoop;
- case OFFSET_ILLEGAL:
- offset = pullResult.getNextBeginOffset();
- break;
- default:
- Assert.fail(String.format(Locale.ENGLISH, "unexpected
pull offset=%d status: %s", offset, pullResult));
- }
- }
- }
-
- {
- LogicalQueuesInfo logicalQueuesInfo =
mqAdminExt.queryTopicLogicalQueueMapping(brokerController.getBrokerAddr(),
topic);
- List<LogicalQueueRouteData> logicalQueueRouteDataList =
logicalQueuesInfo.get(logicalQueueIdx);
-
assertThat(logicalQueueRouteDataList).isEqualTo(Collections.singletonList(new
LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES, new
MessageQueue(topic, fromBrokerName, logicalQueueIdx),
MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, fromBrokerAddr)));
- }
-
- // try migrate to this broker which has a expired queue, expect it
will reuse the expired one, pull it to verify if delta works well
- new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic,
logicalQueueIdx, toBrokerName, null);
-
- {
- List<LogicalQueueRouteData> logicalQueueRouteDataList;
- topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
- logicalQueueRouteDataList =
topicRouteInfo.getLogicalQueuesInfo().get(logicalQueueIdx);
- assertThat(logicalQueueRouteDataList).isEqualTo(Arrays.asList(
- new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES,
new MessageQueue(topic, fromBrokerName, logicalQueueIdx),
MessageQueueRouteState.Expired, MSG_SENT_TIMES, 2 * MSG_SENT_TIMES, 0, 0,
fromBrokerAddr)
- , new LogicalQueueRouteData(logicalQueueIdx, 3 *
MSG_SENT_TIMES, new MessageQueue(topic, toBrokerName, QUEUE_NUMBERS),
MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, toBrokerAddr)
- ));
-
- LogicalQueuesInfo info;
- info = mqAdminExt.queryTopicLogicalQueueMapping(fromBrokerAddr,
topic);
- logicalQueueRouteDataList = info.get(logicalQueueIdx);
- assertThat(logicalQueueRouteDataList).isEqualTo(Arrays.asList(
- new LogicalQueueRouteData(logicalQueueIdx, 2 * MSG_SENT_TIMES,
new MessageQueue(topic, fromBrokerName, logicalQueueIdx),
MessageQueueRouteState.Expired, MSG_SENT_TIMES, 2 * MSG_SENT_TIMES, 0, 0,
fromBrokerAddr)
- , new LogicalQueueRouteData(logicalQueueIdx, 3 *
MSG_SENT_TIMES, new MessageQueue(topic, toBrokerName, QUEUE_NUMBERS),
MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, toBrokerAddr)
- ));
- info = mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr,
topic);
- logicalQueueRouteDataList = info.get(logicalQueueIdx);
-
assertThat(logicalQueueRouteDataList).isEqualTo(Collections.singletonList(new
LogicalQueueRouteData(logicalQueueIdx, 3 * MSG_SENT_TIMES, new
MessageQueue(topic, toBrokerName, QUEUE_NUMBERS),
MessageQueueRouteState.Normal, MSG_SENT_TIMES, -1, -1, -1, toBrokerAddr)));
- }
-
- msgIdx = 3 * MSG_SENT_TIMES;
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- SendResult sendResult = producer.send(new Message(topic,
String.format(Locale.ENGLISH, "%s-%d-%d", methodName, logicalQueueIdx,
msgIdx++).getBytes(StandardCharsets.UTF_8)), migratedMessageQueue);
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
-
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(toBrokerName);
- assertThat(sendResult2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
- }
-
- {
- int msgCount = 0;
- Queue<Integer> wantMsgIdx = new LinkedList<>();
- wantMsgIdx.addAll(IntStream.range(3 * MSG_SENT_TIMES, 4 *
MSG_SENT_TIMES).boxed().collect(Collectors.toList()));
- LOOP:
- for (long offset = 0L; ; ) {
- PullResult pullResult = consumer.pull(migratedMessageQueue,
"*", offset, 3 * MSG_SENT_TIMES);
- switch (pullResult.getPullStatus()) {
- case NO_NEW_MSG:
- assertThat(msgCount).as("msgCount with offset=%d",
offset).isEqualTo(MSG_SENT_TIMES);
- break LOOP;
- case OFFSET_ILLEGAL:
-
assertThat(pullResult.getNextBeginOffset()).isNotEqualTo(Long.MIN_VALUE);
- offset = pullResult.getNextBeginOffset();
- break;
- case FOUND:
- msgCount += pullResult.getMsgFoundList().size();
- boolean first = true;
- for (MessageExt msg : pullResult.getMsgFoundList()) {
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).as("offset=%d",
offset).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", methodName,
logicalQueueIdx, wantMsgIdx.poll()));
- if (first) {
-
assertThat(msg.getQueueOffset()).isGreaterThanOrEqualTo(offset);
- first = false;
- } else {
-
assertThat(msg.getQueueOffset()).isGreaterThan(offset);
- }
- offset = msg.getQueueOffset();
- }
- offset = pullResult.getNextBeginOffset();
- break;
- default:
- Assert.fail(String.format(Locale.ENGLISH, "unexpected
pull offset=%d status: %s", offset, pullResult));
- }
- }
- }
- }
-
- @Test
- public void test006_LogicalQueueNumChanged() throws Exception {
- String methodName = getCurrentMethodName();
- int logicalQueueNum = brokerNum * QUEUE_NUMBERS;
-
- List<MessageQueue> publishMessageQueues;
- publishMessageQueues = producer.fetchPublishMessageQueues(topic);
- assertThat(publishMessageQueues).hasSize(logicalQueueNum);
- List<MessageQueue> subscribeMessageQueues;
- subscribeMessageQueues =
consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
- assertThat(subscribeMessageQueues).hasSize(logicalQueueNum);
-
- logicalQueueNum++;
- new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt,
clusterName, topic, logicalQueueNum);
-
- int newAddLogicalQueueIdx = logicalQueueNum - 1;
- MessageQueue newAddLogicalQueue = new MessageQueue(topic,
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, newAddLogicalQueueIdx);
- String newAddLogicalQueueBrokerName;
- {
- TopicRouteData topicRouteInfo =
mqAdminExt.examineTopicRouteInfo(topic);
- LogicalQueuesInfo info = topicRouteInfo.getLogicalQueuesInfo();
- assertThat(info).isNotNull();
- List<LogicalQueueRouteData> queueRouteDataList =
info.get(newAddLogicalQueueIdx);
- assertThat(queueRouteDataList).isNotNull();
- assertThat(queueRouteDataList).hasSize(1);
- LogicalQueueRouteData queueRouteData = queueRouteDataList.get(0);
- newAddLogicalQueueBrokerName = queueRouteData.getBrokerName();
-
assertThat(queueRouteData.getState()).isEqualTo(MessageQueueRouteState.Normal);
- assertThat(queueRouteData.getLogicalQueueDelta()).isEqualTo(0);
-
assertThat(queueRouteData.getLogicalQueueIndex()).isEqualTo(newAddLogicalQueueIdx);
- }
-
- publishMessageQueues = producer.fetchPublishMessageQueues(topic);
- assertThat(publishMessageQueues).hasSize(logicalQueueNum);
- Set<Integer> logicalQueueIds = IntStream.range(0,
logicalQueueNum).boxed().collect(Collectors.toSet());
- Map<String, Set<Integer>> queueIds = Maps.newHashMap();
- for (String brokerName : Arrays.asList(broker1Name, broker2Name)) {
- queueIds.put(brokerName, IntStream.range(0,
QUEUE_NUMBERS).boxed().collect(Collectors.toSet()));
- }
- queueIds.get(newAddLogicalQueueBrokerName).add(QUEUE_NUMBERS);
- for (MessageQueue messageQueue : publishMessageQueues) {
-
assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(logicalQueueIds.remove(messageQueue.getQueueId())).isTrue();
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- SendResult sendResult = producer.send(new Message(topic,
String.format(Locale.ENGLISH, "%s-%d-%d", methodName,
messageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)), messageQueue);
-
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
-
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
- if (i == 0) {
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
-
assertThat(queueIds.get(sendResult2.getOrigBrokerName()).remove(sendResult2.getOrigQueueId())).as("brokerName
%s queueId %d", sendResult2.getOrigBrokerName(),
sendResult2.getOrigQueueId()).isTrue();
- }
- }
- }
- assertThat(logicalQueueIds).isEmpty();
-
- subscribeMessageQueues =
consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
- assertThat(subscribeMessageQueues).hasSize(logicalQueueNum);
-
subscribeMessageQueues.sort(Comparator.comparingInt(MessageQueue::getQueueId));
- logicalQueueIds.addAll(IntStream.range(0,
logicalQueueNum).boxed().collect(Collectors.toSet()));
- for (MessageQueue messageQueue : subscribeMessageQueues) {
-
assertThat(messageQueue.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(logicalQueueIds.remove(messageQueue.getQueueId())).isTrue();
- long offset = mqAdminExt.minOffset(messageQueue);
- assertThat(offset).isEqualTo(0);
- PullResult pullResult = consumer.pull(messageQueue, "*", offset,
10);
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- MessageExt msg = pullResult.getMsgFoundList().get(i);
-
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(msg.getQueueId()).isEqualTo(messageQueue.getQueueId());
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d",
methodName, messageQueue.getQueueId(), i));
- assertThat(msg.getQueueOffset()).isEqualTo(offset + i);
- }
- assertThat(maxOffsetUncommitted(messageQueue)).isEqualTo(offset +
MSG_SENT_TIMES);
- }
- assertThat(logicalQueueIds).isEmpty();
-
- // increase TopicConfig write queue first then increase logical queue,
expect to reuse
- String broker2Addr = brokerController2.getBrokerAddr();
- TopicConfig topicConfig = mqAdminExt.examineTopicConfig(broker2Addr,
topic);
- topicConfig.setWriteQueueNums(topicConfig.getWriteQueueNums() + 1);
- topicConfig.setReadQueueNums(topicConfig.getReadQueueNums() + 1);
- mqAdminExt.createAndUpdateTopicConfig(broker2Addr, topicConfig);
- logicalQueueNum++;
- new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt,
clusterName, topic, logicalQueueNum);
- {
- newAddLogicalQueueIdx = logicalQueueNum -1;
- TopicRouteData topicRouteInfo =
mqAdminExt.examineTopicRouteInfo(topic);
- LogicalQueuesInfo info = topicRouteInfo.getLogicalQueuesInfo();
- assertThat(info).isNotNull();
- List<LogicalQueueRouteData> queueRouteDataList =
info.get(newAddLogicalQueueIdx);
- assertThat(queueRouteDataList).isNotNull();
- assertThat(queueRouteDataList).hasSize(1);
- LogicalQueueRouteData queueRouteData = queueRouteDataList.get(0);
-
assertThat(queueRouteData.getState()).isEqualTo(MessageQueueRouteState.Normal);
- assertThat(queueRouteData.getLogicalQueueDelta()).isEqualTo(0);
-
assertThat(queueRouteData.getLogicalQueueIndex()).isEqualTo(newAddLogicalQueueIdx);
- assertThat(queueRouteData.getBrokerName()).isEqualTo(broker2Name);
-
assertThat(queueRouteData.getQueueId()).isEqualTo(topicConfig.getWriteQueueNums()
-1);
- }
-
- logicalQueueNum-=2;
- new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt,
clusterName, topic, logicalQueueNum);
-
- try {
- producer.send(new Message(topic,
"aaa".getBytes(StandardCharsets.UTF_8)), newAddLogicalQueue);
- Assert.fail("write to decreased logical queue success, want it
failed");
- } catch (MQBrokerException e) {
-
assertThat(e.getResponseCode()).isEqualTo(ResponseCode.NO_PERMISSION);
- }
- {
- int offset = 0;
- PullResult pullResult = consumer.pull(newAddLogicalQueue, "*",
offset, 10);
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- MessageExt msg = pullResult.getMsgFoundList().get(i);
-
assertThat(msg.getBrokerName()).isEqualTo(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME);
-
assertThat(msg.getQueueId()).isEqualTo(newAddLogicalQueue.getQueueId());
- assertThat(new String(msg.getBody(),
StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d",
methodName, newAddLogicalQueue.getQueueId(), i));
- assertThat(msg.getQueueOffset()).isEqualTo(offset + i);
- }
- }
-
- // rotate to remove new add queue's data, and try pull again
- {
- BrokerController brokerController =
brokerControllerMap.get(newAddLogicalQueueBrokerName);
- rotateBrokerCommitLog(brokerController);
- deleteCommitLogFiles(brokerController, 1);
- }
- {
- int offset = 0;
- PullResult pullResult = consumer.pull(newAddLogicalQueue, "*",
offset, 10);
- assertThat(pullResult.getPullStatus()).isIn(PullStatus.NO_NEW_MSG,
PullStatus.NO_MATCHED_MSG);
- }
- }
-
- @Test
- public void test007_LogicalQueueWritableEvenBrokerDown() throws Exception {
- final String methodName = getCurrentMethodName();
-
- final int logicalQueueIdx = 1;
-
- BrokerController brokerController3 =
IntegrationTestBase.createAndStartBroker(nsAddr);
- String broker3Name =
brokerController3.getBrokerConfig().getBrokerName();
- brokerControllerMap.put(broker3Name, brokerController3);
- await().atMost(30, TimeUnit.SECONDS).until(() ->
mqAdminExt.examineBrokerClusterInfo().getBrokerAddrTable().containsKey(broker3Name));
-
mqAdminExt.createAndUpdateTopicConfig(brokerController3.getBrokerAddr(), new
TopicConfig(topic, 0, 0, PermName.PERM_READ | PermName.PERM_WRITE));
-
- new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic,
logicalQueueIdx, brokerController3.getBrokerConfig().getBrokerName(), null);
-
- MessageQueue migrateMessageQueue = new MessageQueue(topic,
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx);
- {
- for (int i = 0; i < MSG_SENT_TIMES; i++) {
- SendResult sendResult = producer.send(new Message(topic,
String.format(Locale.ENGLISH, "%s-%d-%d", methodName,
migrateMessageQueue.getQueueId(), i).getBytes(StandardCharsets.UTF_8)),
migrateMessageQueue);
-
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migrateMessageQueue.getBrokerName());
-
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migrateMessageQueue.getQueueId());
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
-
assertThat(sendResult2.getOrigBrokerName()).isEqualTo(broker3Name);
- assertThat(sendResult2.getOrigQueueId()).isEqualTo(0);
- }
- }
- brokerController3.shutdown();
- brokerControllerMap.remove(broker3Name);
-
- assertThatThrownBy(() -> {
- SendResult sendResult = producer.send(new Message(topic,
"aaa".getBytes(StandardCharsets.UTF_8)), migrateMessageQueue);
- logger.error("send should fail but got {}", sendResult);
- }).isInstanceOf(RemotingException.class).hasMessageMatching("connect
to [0-9.:]+ failed");
-
- assertThatThrownBy(() -> {
- new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic,
logicalQueueIdx, broker1Name, null);
-
}).hasRootCauseInstanceOf(RemotingConnectException.class).hasMessageContaining("migrateTopicLogicalQueuePrepare");
-
- {
- SendResult sendResult = producer.send(new Message(topic,
"aaa".getBytes(StandardCharsets.UTF_8)), migrateMessageQueue);
-
assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(migrateMessageQueue.getBrokerName());
-
assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(migrateMessageQueue.getQueueId());
- assertThat(sendResult.getQueueOffset()).isEqualTo(-1);
- SendResultForLogicalQueue sendResult2 =
(SendResultForLogicalQueue) sendResult;
- assertThat(sendResult2.getOrigBrokerName()).isEqualTo(broker1Name);
- assertThat(sendResult2.getOrigQueueId()).isIn(
- /* CommitLog not rotated, will not reuse */QUEUE_NUMBERS,
- /* CommitLog rotated in other test cases, will reuse
*/logicalQueueIdx
- );
- }
- }
-
- private static String getBrokerCommitLogFileName(BrokerController
brokerController) throws IllegalAccessException {
- DefaultMessageStore defaultMessageStore = (DefaultMessageStore)
brokerController.getMessageStore();
- MappedFileQueue mfq = (MappedFileQueue)
FieldUtils.readDeclaredField(defaultMessageStore.getCommitLog(),
"mappedFileQueue", true);
- return mfq.getLastMappedFile().getFileName();
- }
-
- private static void deleteCommitLogFiles(BrokerController brokerController,
- int keepNum) throws IllegalAccessException {
- CommitLog commitLog = ((DefaultMessageStore)
brokerController.getMessageStore()).getCommitLog();
- commitLog.flush();
- MappedFileQueue mfq = (MappedFileQueue)
FieldUtils.readDeclaredField(commitLog, "mappedFileQueue", true);
- AtomicInteger count = new AtomicInteger();
- waitAtMost(5, TimeUnit.SECONDS).until(() -> {
- count.getAndAdd(commitLog.deleteExpiredFile(0, 0, 5000, true, 1));
- return mfq.getMappedFiles().size() <= keepNum;
- });
-
brokerController.getTopicConfigManager().getLogicalQueueCleanHook().execute((DefaultMessageStore)
brokerController.getMessageStore(), count.get());
- logger.info("deleteCommitLogFiles {} count {}",
brokerController.getBrokerConfig().getBrokerName(), count.get());
- }
-
- private static void rotateBrokerCommitLog(BrokerController
brokerController) throws IllegalAccessException {
- CommitLog commitLog = ((DefaultMessageStore)
brokerController.getMessageStore()).getCommitLog();
- commitLog.flush();
- String brokerName = brokerController.getBrokerConfig().getBrokerName();
- String fileName1 = getBrokerCommitLogFileName(brokerController);
- logger.info("rotateBrokerCommitLog {} first {}", brokerName,
fileName1);
- int msgSize = 4 * 1024;
- byte[] data =
RandomStringUtils.randomAscii(msgSize).getBytes(StandardCharsets.UTF_8);
- Message msg = new Message(placeholderTopic, data);
- MessageQueue mq = new MessageQueue(placeholderTopic, brokerName, 0);
- waitAtMost(5, TimeUnit.SECONDS).until(() -> {
- for (int i = 0; i < 128; i++) {
- producer.send(msg, mq);
- }
- commitLog.flush();
- String fileName2 = getBrokerCommitLogFileName(brokerController);
- if (!fileName1.equals(fileName2)) {
- logger.info("rotateBrokerCommitLog {} 4K msg last {}",
brokerName, fileName2);
- return true;
- }
- return false;
- });
- }
-
- private long maxOffsetUncommitted(MessageQueue mq) throws
IllegalAccessException, MQClientException {
- DefaultMQAdminExtImpl defaultMQAdminExtImpl = (DefaultMQAdminExtImpl)
FieldUtils.readDeclaredField(mqAdminExt, "defaultMQAdminExtImpl", true);
- return defaultMQAdminExtImpl.maxOffset(mq, false);
- }
-}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
new file mode 100644
index 0000000..9dd116d
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -0,0 +1,76 @@
+package org.apache.rocketmq.test.smoke;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.rpc.ClientMetadata;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
+
+@FixMethodOrder
+public class StaticTopicIT extends BaseConf {
+
+ private static Logger logger = Logger.getLogger(StaticTopicIT.class);
+ private DefaultMQAdminExt defaultMQAdminExt;
+ private ClientMetadata clientMetadata;
+
+ @Before
+ public void setUp() throws Exception {
+ defaultMQAdminExt = getAdmin(nsAddr);
+ waitBrokerRegistered(nsAddr, clusterName);
+ clientMetadata = new ClientMetadata();
+ ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
+ if (clusterInfo == null
+ || clusterInfo.getClusterAddrTable().isEmpty()) {
+ throw new RuntimeException("The Cluster info is empty");
+ }
+ clientMetadata.refreshClusterInfo(clusterInfo);
+
+ }
+
+ @Test
+ public void testCreateStaticTopic() throws Exception {
+ String topic = "static" + MQRandomUtils.getRandomTopic();
+ int queueNum = 10;
+ Set<String> brokers = getBrokers();
+ //create topic
+ {
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap =
defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+ Assert.assertTrue(brokerConfigMap.isEmpty());
+ TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum,
getBrokers(), brokerConfigMap);
+ //If some succeed, and others fail, it will cause inconsistent data
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry :
brokerConfigMap.entrySet()) {
+ String broker = entry.getKey();
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail(), false);
+ }
+ }
+ Map<String, TopicConfigAndQueueMapping> brokerConfigMap =
defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+
+
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic,
brokerConfigMap);
+ Map<Integer, TopicQueueMappingOne> globalIdMap =
TopicQueueMappingUtils.checkAndBuildMappingItems(new
ArrayList<>(getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
+ Assert.assertEquals(queueNum, globalIdMap.size());
+
+ }
+
+ @After
+ public void tearDown() {
+ super.shutdown();
+ }
+
+}
diff --git
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index f1f237d..857c76e 100644
---
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -150,13 +150,7 @@ public class DefaultMQAdminExtTest {
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList<QueueData>());
topicRouteData.setFilterServerTable(new HashMap<String,
List<String>>());
- LogicalQueuesInfo logicalQueuesInfoinfo = new LogicalQueuesInfo();
- logicalQueuesInfoinfo.put(0, Lists.newArrayList(
- new LogicalQueueRouteData(0, 0, new MessageQueue(topic1,
broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 1000, 2000, 3000,
broker1Addr),
- new LogicalQueueRouteData(0, 1000, new MessageQueue(topic1,
broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)
- ));
- topicRouteData.setLogicalQueuesInfo(logicalQueuesInfoinfo);
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(), any())).thenReturn(topicRouteData);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(topicRouteData);
HashMap<String, String> result = new HashMap<>();
result.put("id", String.valueOf(MixAll.MASTER_ID));