This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new 1f89733  Polish compile errrors
1f89733 is described below

commit 1f897336097e34bb02546dfdd80c814382adb87a
Author: dongeforever <[email protected]>
AuthorDate: Tue Nov 9 20:20:27 2021 +0800

    Polish compile errrors
---
 .../client/consumer/DefaultMQPushConsumer.java     |   2 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   5 -
 .../client/impl/consumer/PullAPIWrapper.java       | 350 ++-------------------
 .../DefaultMQPullConsumerLogicalQueueTest.java     | 248 ---------------
 .../DefaultMQProducerLogicalQueueTest.java         | 311 ------------------
 .../client/producer/DefaultMQProducerTest.java     |  28 +-
 .../DefaultMQProducerWithOpenTracingTest.java      |  14 +-
 .../trace/DefaultMQProducerWithTraceTest.java      |  26 +-
 .../TransactionMQProducerWithOpenTracingTest.java  |  16 +-
 .../trace/TransactionMQProducerWithTraceTest.java  |  40 +--
 .../protocol/route/TopicRouteDataNameSrv.java      |  64 ----
 .../rocketmq/common/RegisterBrokerBodyTest.java    |   3 +-
 .../processor/DefaultRequestProcessorTest.java     |  17 +-
 13 files changed, 78 insertions(+), 1046 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index cf8cbb0..8a6340b 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -691,7 +691,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     public void sendMessageBack(MessageExt msg, int delayLevel)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
+        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, 
(String) null);
     }
 
     /**
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c07f9fa..67fd937 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.client.impl;
 
-import com.google.common.base.Function;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -180,9 +179,7 @@ import 
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
 import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
 import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.InvokeCallback;
@@ -201,8 +198,6 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-import static com.google.common.base.Optional.fromNullable;
-
 public class MQClientAPIImpl {
 
     private final static InternalLogger log = ClientLogger.getLog();
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 6d966a6..8cd4bab 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -16,15 +16,20 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Objects;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.exception.MQRedirectException;
 import org.apache.rocketmq.client.hook.FilterMessageContext;
 import org.apache.rocketmq.client.hook.FilterMessageHook;
 import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -34,35 +39,18 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.google.common.base.Optional.fromNullable;
-
 public class PullAPIWrapper {
     private final InternalLogger log = ClientLogger.getLog();
     private final MQClientInstance mQClientFactory;
@@ -83,36 +71,13 @@ public class PullAPIWrapper {
 
     public PullResult processPullResult(final MessageQueue mq, final 
PullResult pullResult,
         final SubscriptionData subscriptionData) {
-        final PullResultExt pullResultExt = (PullResultExt) pullResult;
-
-        LogicalQueueRouteData queueRouteData = null;
-        PullResultWithLogicalQueues pullResultWithLogicalQueues = null;
-        if (pullResultExt instanceof PullResultWithLogicalQueues) {
-            pullResultWithLogicalQueues = (PullResultWithLogicalQueues) 
pullResultExt;
-            queueRouteData = pullResultWithLogicalQueues.getQueueRouteData();
-        }
-
-        if (queueRouteData != null) {
-            pullResultWithLogicalQueues.setOrigPullResultExt(new 
PullResultExt(pullResultExt.getPullStatus(),
-                
queueRouteData.toLogicalQueueOffset(pullResultExt.getNextBeginOffset()),
-                
queueRouteData.toLogicalQueueOffset(pullResultExt.getMinOffset()),
-                // although this maxOffset may not belong to this queue route, 
but the actual value must be a larger one, and since maxOffset here is not an 
accurate value, we just do it to make things simple.
-                
queueRouteData.toLogicalQueueOffset(pullResultExt.getMaxOffset()),
-                pullResultExt.getMsgFoundList(),
-                pullResultExt.getSuggestWhichBrokerId(),
-                pullResultExt.getMessageBinary()));
-        }
+        PullResultExt pullResultExt = (PullResultExt) pullResult;
 
         this.updatePullFromWhichNode(mq, 
pullResultExt.getSuggestWhichBrokerId());
         if (PullStatus.FOUND == pullResult.getPullStatus()) {
             ByteBuffer byteBuffer = 
ByteBuffer.wrap(pullResultExt.getMessageBinary());
             List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
 
-            if (queueRouteData != null) {
-                // prevent pulled data is out of current queue route, this 
happens when some commit log data is cleaned in the broker but still pull from 
it.
-                msgList = queueRouteData.filterMessages(msgList);
-            }
-
             List<MessageExt> msgListFilterAgain = msgList;
             if (!subscriptionData.getTagsSet().isEmpty() && 
!subscriptionData.isClassFilterMode()) {
                 msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
@@ -142,10 +107,6 @@ public class PullAPIWrapper {
                 MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_MAX_OFFSET,
                     Long.toString(pullResult.getMaxOffset()));
                 msg.setBrokerName(mq.getBrokerName());
-                msg.setQueueId(mq.getQueueId());
-                if (queueRouteData != null) {
-                    
msg.setQueueOffset(queueRouteData.toLogicalQueueOffset(msg.getQueueOffset()));
-                }
             }
 
             pullResultExt.setMsgFoundList(msgListFilterAgain);
@@ -153,7 +114,7 @@ public class PullAPIWrapper {
 
         pullResultExt.setMessageBinary(null);
 
-        return pullResultExt;
+        return pullResult;
     }
 
     public void updatePullFromWhichNode(final MessageQueue mq, final long 
brokerId) {
@@ -182,67 +143,24 @@ public class PullAPIWrapper {
     }
 
     public PullResult pullKernelImpl(
-        MessageQueue mq,
+        final MessageQueue mq,
         final String subExpression,
         final String expressionType,
         final long subVersion,
-        long offset,
+        final long offset,
         final int maxNums,
         final int sysFlag,
-        long commitOffset,
+        final long commitOffset,
         final long brokerSuspendMaxTimeMillis,
         final long timeoutMillis,
         final CommunicationMode communicationMode,
-        PullCallback pullCallback
+        final PullCallback pullCallback
     ) throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
-        if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName())) {
-            LogicalQueueContext logicalQueueContext = new 
LogicalQueueContext(mq, subExpression, expressionType, subVersion, offset, 
maxNums, sysFlag, commitOffset, brokerSuspendMaxTimeMillis, timeoutMillis, 
communicationMode, pullCallback);
-            while (true) {
-                try {
-                    MessageQueue messageQueue = 
logicalQueueContext.getModifiedMessageQueue();
-                    if (messageQueue == null) {
-                        if (pullCallback != null) {
-                            
pullCallback.onSuccess(logicalQueueContext.getPullResult());
-                            return null;
-                        } else {
-                            return logicalQueueContext.getPullResult();
-                        }
-                    }
-                    PullResult pullResult = 
this.pullKernelImplWithoutRetry(messageQueue, subExpression, expressionType, 
subVersion, logicalQueueContext.getModifiedOffset(), maxNums, sysFlag, 
logicalQueueContext.getModifiedCommitOffset(), brokerSuspendMaxTimeMillis, 
timeoutMillis, communicationMode, logicalQueueContext.wrapPullCallback());
-                    return logicalQueueContext.wrapPullResult(pullResult);
-                } catch (MQRedirectException e) {
-                    if (!logicalQueueContext.shouldRetry(e)) {
-                        throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, 
"redirect");
-                    }
-                }
-            }
-        } else {
-            return this.pullKernelImplWithoutRetry(mq, subExpression, 
expressionType, subVersion, offset, maxNums, sysFlag, commitOffset, 
brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, pullCallback);
-        }
-    }
-
-    public PullResult pullKernelImplWithoutRetry(
-        MessageQueue mq,
-        final String subExpression,
-        final String expressionType,
-        final long subVersion,
-        long offset,
-        final int maxNums,
-        final int sysFlag,
-        long commitOffset,
-        final long brokerSuspendMaxTimeMillis,
-        final long timeoutMillis,
-        final CommunicationMode communicationMode,
-        PullCallback pullCallback
-    ) throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
-        String topic = mq.getTopic();
-        int queueId = mq.getQueueId();
-
         FindBrokerResult findBrokerResult =
             
this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
                 this.recalculatePullFromWhichNode(mq), false);
         if (null == findBrokerResult) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+            
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
             findBrokerResult =
                 
this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
                     this.recalculatePullFromWhichNode(mq), false);
@@ -265,8 +183,8 @@ public class PullAPIWrapper {
 
             PullMessageRequestHeader requestHeader = new 
PullMessageRequestHeader();
             requestHeader.setConsumerGroup(this.consumerGroup);
-            requestHeader.setTopic(topic);
-            requestHeader.setQueueId(queueId);
+            requestHeader.setTopic(mq.getTopic());
+            requestHeader.setQueueId(mq.getQueueId());
             requestHeader.setQueueOffset(offset);
             requestHeader.setMaxMsgNums(maxNums);
             requestHeader.setSysFlag(sysFlagInner);
@@ -278,15 +196,17 @@ public class PullAPIWrapper {
 
             String brokerAddr = findBrokerResult.getBrokerAddr();
             if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
-                brokerAddr = computePullFromWhichFilterServer(topic, 
brokerAddr);
+                brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), 
brokerAddr);
             }
 
-            return this.mQClientFactory.getMQClientAPIImpl().pullMessage(
+            PullResult pullResult = 
this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                 brokerAddr,
                 requestHeader,
                 timeoutMillis,
                 communicationMode,
                 pullCallback);
+
+            return pullResult;
         }
 
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] 
not exist", null);
@@ -403,230 +323,4 @@ public class PullAPIWrapper {
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] 
not exist", null);
     }
 
-    private class LogicalQueueContext implements PullCallback {
-        private final MessageQueue mq;
-        private final String subExpression;
-        private final String expressionType;
-        private final long subVersion;
-        private final long offset;
-        private final int maxNums;
-        private final int sysFlag;
-        private final long commitOffset;
-        private final long brokerSuspendMaxTimeMillis;
-        private final long timeoutMillis;
-        private final CommunicationMode communicationMode;
-        private final PullCallback pullCallback;
-
-        private volatile LogicalQueuesInfo logicalQueuesInfo;
-        private volatile LogicalQueueRouteData logicalQueueRouteData;
-        private volatile boolean isMaxReadableQueueRoute;
-
-        private volatile PullResultExt pullResult = null;
-
-        private final AtomicInteger retry = new AtomicInteger();
-
-        public LogicalQueueContext(MessageQueue mq, String subExpression, 
String expressionType, long subVersion,
-            long offset, int maxNums, int sysFlag, long commitOffset, long 
brokerSuspendMaxTimeMillis,
-            long timeoutMillis, CommunicationMode communicationMode,
-            PullCallback pullCallback) {
-            this.mq = mq;
-            this.subExpression = subExpression;
-            this.expressionType = expressionType;
-            this.subVersion = subVersion;
-            this.offset = offset;
-            this.maxNums = maxNums;
-            this.sysFlag = sysFlag;
-            this.commitOffset = commitOffset;
-            this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
-            this.timeoutMillis = timeoutMillis;
-            this.communicationMode = communicationMode;
-            this.pullCallback = pullCallback;
-
-            this.buildLogicalQueuesInfo();
-        }
-
-        private boolean notUsingLogicalQueue() {
-            return !Objects.equal(mq.getBrokerName(), 
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME) || this.logicalQueuesInfo == null;
-        }
-
-        private void buildLogicalQueuesInfo() {
-            TopicRouteData topicRouteData = 
PullAPIWrapper.this.mQClientFactory.queryTopicRouteData(mq.getTopic());
-            if (topicRouteData != null) {
-                //TODO
-                //this.logicalQueuesInfo = 
topicRouteData.getLogicalQueuesInfo();
-            }
-        }
-
-        @Override public void onSuccess(PullResult pullResult) {
-            this.pullCallback.onSuccess(this.wrapPullResult(pullResult));
-        }
-
-        @Override public void onException(Throwable t) {
-            if (!this.shouldRetry(t)) {
-                this.pullCallback.onException(t);
-                return;
-            }
-            MessageQueue messageQueue = this.getModifiedMessageQueue();
-            if (messageQueue == null) {
-                this.pullCallback.onSuccess(this.getPullResult());
-                return;
-            }
-            try {
-                PullAPIWrapper.this.pullKernelImplWithoutRetry(messageQueue, 
subExpression, expressionType, subVersion, this.getModifiedOffset(), maxNums, 
sysFlag, this.getModifiedCommitOffset(), brokerSuspendMaxTimeMillis, 
timeoutMillis, communicationMode, this);
-            } catch (Exception e) {
-                this.pullCallback.onException(e);
-            }
-        }
-
-        public MessageQueue getModifiedMessageQueue() {
-            if (this.notUsingLogicalQueue()) {
-                return this.mq;
-            }
-            this.logicalQueuesInfo.readLock().lock();
-            try {
-                List<LogicalQueueRouteData> queueRouteDataList = 
fromNullable(this.logicalQueuesInfo.get(this.mq.getQueueId())).or(Collections.<LogicalQueueRouteData>emptyList());
-                LogicalQueueRouteData searchKey = new LogicalQueueRouteData();
-                searchKey.setState(MessageQueueRouteState.Normal);
-                searchKey.setLogicalQueueDelta(offset);
-                // it's sorted after getTopicRouteInfoFromNameServer
-                int startIdx = Collections.binarySearch(queueRouteDataList, 
searchKey);
-                if (startIdx < 0) {
-                    startIdx = -startIdx - 1;
-                    // lower entry
-                    startIdx -= 1;
-                }
-                this.logicalQueueRouteData = null;
-                this.pullResult = null;
-                LogicalQueueRouteData lastReadableLogicalQueueRouteData = 
null; // first item which delta > offset
-                LogicalQueueRouteData minReadableLogicalQueueRouteData = null;
-                LogicalQueueRouteData maxReadableLogicalQueueRouteData = null;
-                for (int i = 0, size = queueRouteDataList.size(); i < size; 
i++) {
-                    LogicalQueueRouteData queueRouteData = 
queueRouteDataList.get(i);
-                    if (!queueRouteData.isReadable()) {
-                        continue;
-                    }
-                    maxReadableLogicalQueueRouteData = queueRouteData;
-                    if (minReadableLogicalQueueRouteData == null) {
-                        minReadableLogicalQueueRouteData = queueRouteData;
-                        if (i < startIdx) {
-                            // must consider following `i++` operation when 
invoke `continue`, so decrease first
-                            i = startIdx - 1;
-                            continue;
-                        }
-                    }
-                    if (queueRouteData.getLogicalQueueDelta() > offset) {
-                        if (this.logicalQueueRouteData != null) {
-                            if 
(this.logicalQueueRouteData.toLogicalQueueOffset(this.logicalQueueRouteData.getOffsetMax())
 <= offset) {
-                                this.logicalQueueRouteData = queueRouteData;
-                            }
-                            break;
-                        } else {
-                            if (lastReadableLogicalQueueRouteData == null) {
-                                lastReadableLogicalQueueRouteData = 
queueRouteData;
-                            }
-                        }
-                    } else {
-                        this.logicalQueueRouteData = queueRouteData;
-                    }
-                }
-                if (this.logicalQueueRouteData == null) {
-                    if (lastReadableLogicalQueueRouteData != null) {
-                        this.pullResult = new 
PullResultExt(PullStatus.OFFSET_ILLEGAL, 
lastReadableLogicalQueueRouteData.getLogicalQueueDelta(), 
minReadableLogicalQueueRouteData.getLogicalQueueDelta(), 
maxReadableLogicalQueueRouteData.getLogicalQueueDelta(), null, 0, null);
-                        return null;
-                    } else {
-                        if (maxReadableLogicalQueueRouteData != null) {
-                            this.logicalQueueRouteData = 
maxReadableLogicalQueueRouteData;
-                        } else {
-                            if (!queueRouteDataList.isEmpty()) {
-                                this.logicalQueueRouteData = 
queueRouteDataList.get(queueRouteDataList.size() - 1);
-                            } else {
-                                pullResult = new 
PullResultExt(PullStatus.NO_NEW_MSG, 0, 0, 0, null, 0, null);
-                                return null;
-                            }
-                        }
-                    }
-                }
-                this.isMaxReadableQueueRoute = 
this.logicalQueueRouteData.isSameTo(maxReadableLogicalQueueRouteData);
-                return this.logicalQueueRouteData.getMessageQueue();
-            } finally {
-                this.logicalQueuesInfo.readLock().unlock();
-            }
-        }
-
-        public PullResultExt getPullResult() {
-            return pullResult;
-        }
-
-        public PullCallback wrapPullCallback() {
-            if (this.notUsingLogicalQueue()) {
-                return this.pullCallback;
-            }
-            if (!CommunicationMode.ASYNC.equals(this.communicationMode)) {
-                return this.pullCallback;
-            }
-            return this;
-        }
-
-        public long getModifiedOffset() {
-            return 
this.logicalQueueRouteData.toMessageQueueOffset(this.offset);
-        }
-
-        public long getModifiedCommitOffset() {
-            // TODO should this be modified too? If offset is not in current 
broker's range, how do we handle it?
-            return this.commitOffset;
-        }
-
-        public void incrRetry() {
-            this.retry.incrementAndGet();
-        }
-
-        public boolean shouldRetry(Throwable t) {
-            this.incrRetry();
-            if (this.retry.get() >= 3) {
-                return false;
-            }
-            if (t instanceof MQRedirectException) {
-                MQRedirectException e = (MQRedirectException) t;
-                this.processResponseBody(e.getBody());
-                return true;
-            }
-            return false;
-        }
-
-        public PullResult wrapPullResult(PullResult pullResult) {
-            if (pullResult == null) {
-                return null;
-            }
-            if (this.logicalQueueRouteData == null) {
-                return pullResult;
-            }
-            if (!this.isMaxReadableQueueRoute && 
PullStatus.NO_MATCHED_MSG.equals(pullResult.getPullStatus())) {
-                PullStatus status = PullStatus.OFFSET_ILLEGAL;
-                if (pullResult instanceof PullResultExt) {
-                    PullResultExt pullResultExt = (PullResultExt) pullResult;
-                    pullResult = new PullResultExt(status, 
pullResultExt.getNextBeginOffset(), pullResultExt.getMinOffset(), 
pullResultExt.getMaxOffset(), pullResultExt.getMsgFoundList(), 
pullResultExt.getSuggestWhichBrokerId(), pullResultExt.getMessageBinary());
-                } else {
-                    pullResult = new PullResult(status, 
pullResult.getNextBeginOffset(), pullResult.getMinOffset(), 
pullResult.getMaxOffset(), pullResult.getMsgFoundList());
-                }
-            }
-            // method PullAPIWrapper#processPullResult will modify 
queueOffset/nextBeginOffset/minOffset/maxOffset
-            return new PullResultWithLogicalQueues(pullResult, 
this.logicalQueueRouteData);
-        }
-
-        public void processResponseBody(byte[] responseBody) {
-            log.info("LogicalQueueContext.processResponseBody got redirect {}: 
{}", this.logicalQueueRouteData, responseBody != null ? new 
String(responseBody, MessageDecoder.CHARSET_UTF8) : null);
-            if (responseBody != null) {
-                try {
-                    List<LogicalQueueRouteData> queueRouteDataList = 
JSON.parseObject(responseBody, MixAll.TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA);
-                    
this.logicalQueuesInfo.updateLogicalQueueRouteDataList(this.mq.getQueueId(), 
queueRouteDataList);
-                    return;
-                } catch (Exception e) {
-                    log.warn("LogicalQueueContext.processResponseBody {} 
update exception, fallback to updateTopicRouteInfoFromNameServer", 
this.logicalQueueRouteData, e);
-                }
-            }
-            //TODO
-            
//PullAPIWrapper.this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic(),
 false, null, Collections.singleton(this.mq.getQueueId()));
-            this.buildLogicalQueuesInfo();
-        }
-    }
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java
deleted file mode 100644
index 15ec564..0000000
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.consumer;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQRedirectException;
-import org.apache.rocketmq.client.impl.CommunicationMode;
-import org.apache.rocketmq.client.impl.FindBrokerResult;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
-import org.apache.rocketmq.client.impl.consumer.PullResultExt;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.assertj.core.util.Lists;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class DefaultMQPullConsumerLogicalQueueTest {
-    private MQClientInstance mQClientFactory;
-    @Mock
-    private MQClientAPIImpl mQClientAPIImpl;
-    private DefaultMQPullConsumer pullConsumer;
-    private String topic;
-    private static final String cluster = "DefaultCluster";
-    private static final String broker1Name = "BrokerA";
-    private static final String broker1Addr = "127.0.0.2:10911";
-    private static final String broker2Name = "BrokerB";
-    private static final String broker2Addr = "127.0.0.3:10911";
-
-    @Before
-    public void init() throws Exception {
-        topic = "FooBar" + System.nanoTime();
-
-        mQClientFactory = 
spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new 
ClientConfig()));
-
-        FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl", 
mQClientAPIImpl, true);
-
-        pullConsumer = new DefaultMQPullConsumer("FooBarGroup" + 
System.nanoTime());
-        pullConsumer.setNamesrvAddr("127.0.0.1:9876");
-        pullConsumer.start();
-
-        PullAPIWrapper pullAPIWrapper = 
pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
-        FieldUtils.writeDeclaredField(pullAPIWrapper, "mQClientFactory", 
mQClientFactory, true);
-
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRouteData());
-
-        doReturn(new FindBrokerResult(broker1Addr, 
false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker1Name), 
anyLong(), anyBoolean());
-        doReturn(new FindBrokerResult(broker2Addr, 
false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker2Name), 
anyLong(), anyBoolean());
-    }
-
-    @After
-    public void terminate() {
-        pullConsumer.shutdown();
-    }
-
-    @Test
-    public void testStart_OffsetShouldNotNUllAfterStart() {
-        Assert.assertNotNull(pullConsumer.getOffsetStore());
-    }
-
-    @Test
-    public void testPullMessage_Success() throws Exception {
-        doAnswer(new Answer<PullResultExt>() {
-            @Override public PullResultExt answer(InvocationOnMock mock) 
throws Throwable {
-                PullMessageRequestHeader requestHeader = mock.getArgument(1);
-                return 
DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, 
PullStatus.FOUND, Collections.singletonList(new MessageExt()));
-            }
-        }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), 
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), 
(PullCallback) isNull());
-
-        MessageQueue messageQueue = new MessageQueue(topic, 
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
-        PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
-        assertThat(pullResult).isNotNull();
-        assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
-        assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
-        assertThat(pullResult.getMinOffset()).isEqualTo(123);
-        assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
-        
assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
-    }
-
-    @Test
-    public void testPullMessage_NotFound() throws Exception {
-        doAnswer(new Answer<PullResult>() {
-            @Override public PullResult answer(InvocationOnMock mock) throws 
Throwable {
-                PullMessageRequestHeader requestHeader = mock.getArgument(1);
-                return 
DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, 
PullStatus.NO_NEW_MSG, new ArrayList<MessageExt>());
-            }
-        }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), 
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), 
(PullCallback) isNull());
-
-        MessageQueue messageQueue = new MessageQueue(topic, 
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
-        PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
-        
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
-    }
-
-    @Test
-    public void testPullMessageAsync_Success() throws Exception {
-        doAnswer(new Answer<PullResult>() {
-            @Override public PullResult answer(InvocationOnMock mock) throws 
Throwable {
-                PullMessageRequestHeader requestHeader = mock.getArgument(1);
-                PullResult pullResult = 
DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, 
PullStatus.FOUND, Collections.singletonList(new MessageExt()));
-
-                PullCallback pullCallback = mock.getArgument(4);
-                pullCallback.onSuccess(pullResult);
-                return null;
-            }
-        }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), 
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.ASYNC), 
any(PullCallback.class));
-
-        final SettableFuture<PullResult> future = SettableFuture.create();
-        MessageQueue messageQueue = new MessageQueue(topic, broker1Name, 0);
-        pullConsumer.pull(messageQueue, "*", 1024, 3, new PullCallback() {
-            @Override
-            public void onSuccess(PullResult pullResult) {
-                future.set(pullResult);
-            }
-
-            @Override
-            public void onException(Throwable e) {
-                future.setException(e);
-            }
-        });
-        PullResult pullResult = future.get(3, TimeUnit.SECONDS);
-        assertThat(pullResult).isNotNull();
-        assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
-        assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
-        assertThat(pullResult.getMinOffset()).isEqualTo(123);
-        assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
-        
assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
-    }
-
-    @Test
-    public void testPullMessageSync_Redirect() throws Exception {
-        doAnswer(new Answer<PullResult>() {
-            @Override public PullResult answer(InvocationOnMock mock) throws 
Throwable {
-                throw new 
MQRedirectException(JSON.toJSONBytes(ImmutableList.of(
-                    new LogicalQueueRouteData(0, 0, new MessageQueue(topic, 
broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr),
-                    new LogicalQueueRouteData(0, 10, new MessageQueue(topic, 
broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)
-                )));
-            }
-        }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), 
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), 
(PullCallback) isNull());
-        doAnswer(new Answer<PullResult>() {
-            @Override public PullResult answer(InvocationOnMock mock) throws 
Throwable {
-                PullMessageRequestHeader requestHeader = mock.getArgument(1);
-                return 
DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, 
PullStatus.FOUND, Collections.singletonList(new MessageExt()));
-            }
-        }).when(mQClientAPIImpl).pullMessage(eq(broker2Addr), 
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), 
(PullCallback) isNull());
-
-        MessageQueue messageQueue = new MessageQueue(topic, 
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
-        PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
-        assertThat(pullResult).isNotNull();
-        assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
-        assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
-        assertThat(pullResult.getMinOffset()).isEqualTo(123 + 10);
-        assertThat(pullResult.getMaxOffset()).isEqualTo(2048 + 10);
-        
assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
-    }
-
-    private TopicRouteData createTopicRouteData() {
-        TopicRouteData topicRouteData = new TopicRouteData();
-
-        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
-        topicRouteData.setBrokerDatas(ImmutableList.of(
-            new BrokerData(cluster, broker1Name, new HashMap<Long, 
String>(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))),
-            new BrokerData(cluster, broker2Name, new HashMap<Long, 
String>(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr)))
-        ));
-
-        List<QueueData> queueDataList = new ArrayList<QueueData>();
-        QueueData queueData;
-        queueData = new QueueData();
-        queueData.setBrokerName(broker1Name);
-        queueData.setPerm(6);
-        queueData.setReadQueueNums(3);
-        queueData.setWriteQueueNums(4);
-        queueData.setTopicSysFlag(0);
-        queueDataList.add(queueData);
-        queueData = new QueueData();
-        queueData.setBrokerName(broker2Name);
-        queueData.setPerm(6);
-        queueData.setReadQueueNums(3);
-        queueData.setWriteQueueNums(4);
-        queueData.setTopicSysFlag(0);
-        queueDataList.add(queueData);
-        topicRouteData.setQueueDatas(queueDataList);
-
-        LogicalQueuesInfo info = new LogicalQueuesInfo();
-        info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new 
MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0, 
broker1Addr)));
-        topicRouteData.setLogicalQueuesInfo(info);
-        return topicRouteData;
-    }
-
-    private PullResultExt createPullResult(PullMessageRequestHeader 
requestHeader, PullStatus pullStatus,
-        List<MessageExt> messageExtList) throws Exception {
-        return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + 
messageExtList.size(), 123, 2048, messageExtList, 0, new byte[] {});
-    }
-}
\ No newline at end of file
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java
deleted file mode 100644
index 12d5cba..0000000
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.producer;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.exception.MQRedirectException;
-import org.apache.rocketmq.client.hook.SendMessageContext;
-import org.apache.rocketmq.client.impl.CommunicationMode;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.assertj.core.api.ThrowableAssert;
-import org.assertj.core.util.Lists;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class DefaultMQProducerLogicalQueueTest {
-    private MQClientInstance mQClientFactory;
-    @Mock
-    private MQClientAPIImpl mQClientAPIImpl;
-
-    private DefaultMQProducer producer;
-    private Message message;
-    private String topic;
-
-    private MessageQueue messageQueue;
-
-    private static final String cluster = "DefaultCluster";
-    private static final String broker1Name = "broker1";
-    private static final String broker2Name = "broker2";
-    private static final String broker1Addr = "127.0.0.2:10911";
-    private static final String broker2Addr = "127.0.0.3:10911";
-
-    @Before
-    public void init() throws Exception {
-        topic = "Foobar" + System.nanoTime();
-        messageQueue = new MessageQueue(topic, 
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
-
-        ConcurrentMap<String, MQClientInstance> factoryTable = 
(ConcurrentMap<String/* clientId */, MQClientInstance>) 
FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", 
true);
-        for (MQClientInstance instance : factoryTable.values()) {
-            instance.shutdown();
-        }
-        factoryTable.clear();
-
-        mQClientFactory = 
spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new 
ClientConfig()));
-        factoryTable.put(new ClientConfig().buildMQClientId(), 
mQClientFactory);
-
-        String producerGroupTemp = "FooBar_PID" + System.nanoTime();
-        producer = new DefaultMQProducer(producerGroupTemp);
-        producer.setNamesrvAddr("127.0.0.1:9876");
-        producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
-        message = new Message(topic, new byte[] {'a'});
-
-        mQClientFactory.registerProducer(producerGroupTemp, 
producer.getDefaultMQProducerImpl());
-
-        producer.start();
-
-        FieldUtils.writeDeclaredField(producer.getDefaultMQProducerImpl(), 
"mQClientFactory", mQClientFactory, true);
-        FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl", 
mQClientAPIImpl, true);
-
-        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
any(CommunicationMode.class),
-            nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class))).thenCallRealMethod();
-        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
any(CommunicationMode.class),
-            (SendCallback) isNull(), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
-            .thenReturn(createSendResult(SendStatus.SEND_OK));
-        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
any(CommunicationMode.class),
-            any(SendCallback.class), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
-            .thenAnswer(new Answer<SendResult>() {
-                @Override public SendResult answer(InvocationOnMock 
invocation) throws Throwable {
-                    SendCallback sendCallback = invocation.getArgument(6);
-                    
sendCallback.onSuccess(DefaultMQProducerLogicalQueueTest.this.createSendResult(SendStatus.SEND_OK));
-                    return null;
-                }
-            });
-    }
-
-    @After
-    public void terminate() {
-        producer.shutdown();
-    }
-
-    @Test
-    public void testSendMessageSync_Success() throws Exception {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
-        SendResult sendResult = producer.send(message, messageQueue);
-
-        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
-        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
-        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
-    }
-
-    @Test
-    public void testSendMessageSync_Redirect() throws Exception {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
-
-        when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
eq(CommunicationMode.SYNC),
-            (SendCallback) isNull(), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
-            .thenThrow(new MQRedirectException(null));
-
-        assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
-            @Override public void call() throws Throwable {
-                producer.send(message, messageQueue);
-            }
-        
}).isInstanceOf(MQBrokerException.class).hasMessageContaining("redirect");
-
-        when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
eq(CommunicationMode.SYNC),
-            (SendCallback) isNull(), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
-            .thenThrow(new 
MQRedirectException(JSON.toJSONBytes(ImmutableList.of(
-                new LogicalQueueRouteData(0, 0, new MessageQueue(topic, 
broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr),
-                new LogicalQueueRouteData(0, 10, new MessageQueue(topic, 
broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)))));
-        when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
eq(CommunicationMode.SYNC),
-            (SendCallback) isNull(), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
-            .thenReturn(createSendResult(SendStatus.SEND_OK));
-
-        SendResult sendResult = producer.send(message, messageQueue);
-        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
-        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
-        assertThat(sendResult.getQueueOffset()).isEqualTo(466L);
-    }
-
-    @Test
-    public void testSendMessageSync_RemotingException() throws Exception {
-        TopicRouteData topicRouteData = createTopicRoute();
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(topicRouteData);
-
-        when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
eq(CommunicationMode.SYNC),
-            (SendCallback) isNull(), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
-            .thenThrow(new RemotingConnectException(broker1Addr));
-        SendResult returnSendResult = createSendResult(SendStatus.SEND_OK);
-        when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name), 
any(Message.class), any(SendMessageRequestHeader.class), anyLong(), 
eq(CommunicationMode.SYNC),
-            (SendCallback) isNull(), nullable(TopicPublishInfo.class), 
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), 
any(DefaultMQProducerImpl.class)))
-            .thenReturn(returnSendResult);
-
-        assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
-            @Override public void call() throws Throwable {
-                producer.send(message, messageQueue);
-            }
-        
}).isInstanceOf(RemotingConnectException.class).hasMessageContaining(broker1Addr);
-
-        topicRouteData.getLogicalQueuesInfo().get(0).add(new 
LogicalQueueRouteData(0, -1, new MessageQueue(topic, broker2Name, 1), 
MessageQueueRouteState.WriteOnly, 0, -1, -1, -1, broker2Addr));
-
-        SendResult sendResult = producer.send(message, messageQueue);
-        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
-        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
-        assertThat(sendResult.getQueueOffset()).isEqualTo(-1L);
-    }
-
-    @Test
-    public void testSendMessageAsync_Success() throws Exception {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
-
-        final SettableFuture<SendResult> future = SettableFuture.create();
-        producer.send(message, messageQueue, new SendCallback() {
-            @Override
-            public void onSuccess(SendResult sendResult) {
-                future.set(sendResult);
-            }
-
-            @Override
-            public void onException(Throwable e) {
-                future.setException(e);
-            }
-        });
-
-        SendResult sendResult = future.get(3, TimeUnit.SECONDS);
-        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
-        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
-        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
-    }
-
-    @Test
-    public void testSendMessageAsync() throws Exception {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
-
-        final AtomicReference<SettableFuture<SendResult>> future = new 
AtomicReference<SettableFuture<SendResult>>();
-        SendCallback sendCallback = new SendCallback() {
-            @Override
-            public void onSuccess(SendResult sendResult) {
-                future.get().set(sendResult);
-            }
-
-            @Override
-            public void onException(Throwable e) {
-                future.get().setException(e);
-            }
-        };
-
-        Message message = new Message();
-        message.setTopic("test");
-        message.setBody("hello world".getBytes());
-        future.set(SettableFuture.<SendResult>create());
-        producer.send(new Message(), messageQueue, sendCallback);
-        assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
-            @Override public void call() throws Throwable {
-                future.get().get(3, TimeUnit.SECONDS);
-            }
-        
}).hasCauseInstanceOf(MQClientException.class).hasMessageContaining("The 
specified topic is blank");
-
-        //this message is send success
-        message.setTopic(topic);
-        future.set(SettableFuture.<SendResult>create());
-        producer.send(message, messageQueue, sendCallback, 1000);
-        future.get().get(3, TimeUnit.SECONDS);
-    }
-
-    public TopicRouteData createTopicRoute() {
-        TopicRouteData topicRouteData = new TopicRouteData();
-
-        topicRouteData.setFilterServerTable(new HashMap<String, 
List<String>>());
-        topicRouteData.setBrokerDatas(ImmutableList.of(
-            new BrokerData(cluster, broker1Name, new HashMap<Long, 
String>(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))),
-            new BrokerData(cluster, broker2Name, new HashMap<Long, 
String>(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr)))
-        ));
-
-        List<QueueData> queueDataList = new ArrayList<QueueData>();
-        QueueData queueData;
-        queueData = new QueueData();
-        queueData.setBrokerName(broker1Name);
-        queueData.setPerm(6);
-        queueData.setReadQueueNums(3);
-        queueData.setWriteQueueNums(4);
-        queueData.setTopicSysFlag(0);
-        queueDataList.add(queueData);
-        queueData = new QueueData();
-        queueData.setBrokerName(broker2Name);
-        queueData.setPerm(6);
-        queueData.setReadQueueNums(3);
-        queueData.setWriteQueueNums(4);
-        queueData.setTopicSysFlag(0);
-        queueDataList.add(queueData);
-        topicRouteData.setQueueDatas(queueDataList);
-
-        LogicalQueuesInfo info = new LogicalQueuesInfo();
-        info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new 
MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0, 
broker1Addr)));
-        topicRouteData.setLogicalQueuesInfo(info);
-        return topicRouteData;
-    }
-
-    private SendResult createSendResult(SendStatus sendStatus) {
-        SendResult sendResult = new SendResult();
-        sendResult.setMsgId("123");
-        sendResult.setOffsetMsgId("123");
-        sendResult.setQueueOffset(456);
-        sendResult.setSendStatus(sendStatus);
-        sendResult.setRegionId("HZ");
-        sendResult.setMessageQueue(new MessageQueue(topic, broker1Name, 0));
-        return sendResult;
-    }
-}
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index a8906b3..5f29fe1 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -49,20 +48,21 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -153,7 +153,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testSendMessageSync_Success() throws RemotingException, 
InterruptedException, MQBrokerException, MQClientException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         SendResult sendResult = producer.send(message);
 
         assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
@@ -163,7 +163,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testSendMessageSync_WithBodyCompressed() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         SendResult sendResult = producer.send(bigMessage);
 
         assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
@@ -174,7 +174,7 @@ public class DefaultMQProducerTest {
     @Test
     public void testSendMessageAsync_Success() throws RemotingException, 
InterruptedException, MQBrokerException, MQClientException {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         producer.send(message, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -197,7 +197,7 @@ public class DefaultMQProducerTest {
         final AtomicInteger cc = new AtomicInteger(0);
         final CountDownLatch countDownLatch = new CountDownLatch(6);
 
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         SendCallback sendCallback = new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -239,7 +239,7 @@ public class DefaultMQProducerTest {
         final AtomicInteger cc = new AtomicInteger(0);
         final CountDownLatch countDownLatch = new CountDownLatch(4);
 
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         SendCallback sendCallback = new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -260,7 +260,7 @@ public class DefaultMQProducerTest {
             }
         };
 
-        List<Message> msgs = new ArrayList<Message>();
+        List<Message> msgs = new ArrayList<>();
         for (int i = 0; i < 5; i++) {
             Message message = new Message();
             message.setTopic("test");
@@ -281,7 +281,7 @@ public class DefaultMQProducerTest {
     @Test
     public void testSendMessageAsync_BodyCompressed() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         producer.send(bigMessage, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
@@ -300,7 +300,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testSendMessageSync_SuccessWithHook() throws Throwable {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         final Throwable[] assertionErrors = new Throwable[1];
         final CountDownLatch countDownLatch = new CountDownLatch(2);
         producer.getDefaultMQProducerImpl().registerSendMessageHook(new 
SendMessageHook() {
@@ -368,7 +368,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testRequestMessage() throws RemotingException, 
RequestTimeoutException, MQClientException, InterruptedException, 
MQBrokerException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         final AtomicBoolean finish = new AtomicBoolean(false);
         new Thread(new Runnable() {
             @Override public void run() {
@@ -394,13 +394,13 @@ public class DefaultMQProducerTest {
 
     @Test(expected = RequestTimeoutException.class)
     public void testRequestMessage_RequestTimeoutException() throws 
RemotingException, RequestTimeoutException, MQClientException, 
InterruptedException, MQBrokerException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         Message result = producer.request(message, 3 * 1000L);
     }
 
     @Test
     public void testAsyncRequest_OnSuccess() throws Exception {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         RequestCallback requestCallback = new RequestCallback() {
             @Override public void onSuccess(Message message) {
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
index 0a1f685..5d64a93 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
@@ -20,11 +20,6 @@ package org.apache.rocketmq.client.trace;
 import io.opentracing.mock.MockSpan;
 import io.opentracing.mock.MockTracer;
 import io.opentracing.tag.Tags;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,14 +47,17 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -115,7 +113,7 @@ public class DefaultMQProducerWithOpenTracingTest {
     @Test
     public void testSendMessageSync_WithTrace_Success() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
         
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp,
 producer.getDefaultMQProducerImpl());
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         producer.send(message);
         assertThat(tracer.finishedSpans().size()).isEqualTo(1);
         MockSpan span = tracer.finishedSpans().get(0);
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index 64f63c5..234e32e 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -17,13 +17,6 @@
 
 package org.apache.rocketmq.client.trace;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -50,17 +43,18 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.nullable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -128,7 +122,7 @@ public class DefaultMQProducerWithTraceTest {
     @Test
     public void testSendMessageSync_WithTrace_Success() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
         
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp,
 traceProducer.getDefaultMQProducerImpl());
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         try {
             producer.send(message);
@@ -140,7 +134,7 @@ public class DefaultMQProducerWithTraceTest {
 
     @Test
     public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         try {
             producer.send(message);
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
index aca6254..dd6d108 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
@@ -20,12 +20,6 @@ package org.apache.rocketmq.client.trace;
 import io.opentracing.mock.MockSpan;
 import io.opentracing.mock.MockTracer;
 import io.opentracing.tag.Tags;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -59,14 +53,18 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -133,7 +131,7 @@ public class TransactionMQProducerWithOpenTracingTest {
     @Test
     public void testSendMessageSync_WithTrace_Success() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
         
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp,
 producer.getDefaultMQProducerImpl());
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         producer.sendMessageInTransaction(message, null);
 
         assertThat(tracer.finishedSpans().size()).isEqualTo(2);
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
index b3a4414..f838817 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
@@ -17,13 +17,6 @@
 
 package org.apache.rocketmq.client.trace;
 
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -57,20 +50,19 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
 
@@ -135,7 +127,7 @@ public class TransactionMQProducerWithTraceTest {
 
         Field fieldHooks = 
DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
         fieldHooks.setAccessible(true);
-        List<EndTransactionHook>hooks = new ArrayList<EndTransactionHook>();
+        List<EndTransactionHook>hooks = new ArrayList<>();
         hooks.add(endTransactionHook);
         fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks);
 
@@ -150,14 +142,12 @@ public class TransactionMQProducerWithTraceTest {
     @Test
     public void testSendMessageSync_WithTrace_Success() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
         
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp,
 traceProducer.getDefaultMQProducerImpl());
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong(), anyBoolean(), 
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
-        final AtomicReference<EndTransactionContext> context = new 
AtomicReference<EndTransactionContext>();
-        doAnswer(new Answer() {
-            @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
-                context.set(mock.<EndTransactionContext>getArgument(0));
-                return null;
-            }
-        
}).when(endTransactionHook).endTransaction(ArgumentMatchers.<EndTransactionContext>any());
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        AtomicReference<EndTransactionContext> context = new 
AtomicReference<>();
+        doAnswer(mock -> {
+            context.set(mock.getArgument(0));
+            return null;
+        }).when(endTransactionHook).endTransaction(any());
         producer.sendMessageInTransaction(message, null);
 
         EndTransactionContext ctx = context.get();
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java
deleted file mode 100644
index e9fb84e..0000000
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.common.protocol.route;
-
-import com.google.common.base.Objects;
-
-public class TopicRouteDataNameSrv extends TopicRouteData {
-    private LogicalQueuesInfoUnordered logicalQueuesInfoUnordered;
-
-    public TopicRouteDataNameSrv() {
-    }
-
-    public LogicalQueuesInfoUnordered getLogicalQueuesInfoUnordered() {
-        return logicalQueuesInfoUnordered;
-    }
-
-    public void setLogicalQueuesInfoUnordered(
-        LogicalQueuesInfoUnordered logicalQueuesInfoUnordered) {
-        this.logicalQueuesInfoUnordered = logicalQueuesInfoUnordered;
-    }
-
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-        if (!super.equals(o))
-            return false;
-        TopicRouteDataNameSrv srv = (TopicRouteDataNameSrv) o;
-        return Objects.equal(logicalQueuesInfoUnordered, 
srv.logicalQueuesInfoUnordered);
-    }
-
-    @Override public int hashCode() {
-        return Objects.hashCode(super.hashCode(), logicalQueuesInfoUnordered);
-    }
-
-    @Override public String toString() {
-        return "TopicRouteDataNameSrv{" +
-            "logicalQueuesInfoUnordered=" + logicalQueuesInfoUnordered +
-            "} " + super.toString();
-    }
-
-    public TopicRouteData toTopicRouteData() {
-        TopicRouteData topicRouteData = new TopicRouteData(this);
-        if (this.logicalQueuesInfoUnordered != null) {
-            
topicRouteData.setLogicalQueuesInfo(this.logicalQueuesInfoUnordered.toLogicalQueuesInfoOrdered());
-        }
-        return topicRouteData;
-    }
-}
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java 
b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
index 87a0fc0..428a928 100644
--- 
a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
+++ 
b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
+import 
org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import static org.junit.Assert.assertEquals;
 import org.junit.Test;
@@ -29,7 +30,7 @@ public class RegisterBrokerBodyTest {
     @Test
     public void test_encode_decode() throws IOException {
         RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
-        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+        TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = 
new TopicConfigAndMappingSerializeWrapper();
         
registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
         
         ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<String, TopicConfig>();
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
index 38bff74..e80dec7 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
@@ -16,42 +16,27 @@
  */
 package org.apache.rocketmq.namesrv.processor;
 
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.MixAll;
+
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
-import 
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
-import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
-import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;

Reply via email to