This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 1f89733 Polish compile errrors
1f89733 is described below
commit 1f897336097e34bb02546dfdd80c814382adb87a
Author: dongeforever <[email protected]>
AuthorDate: Tue Nov 9 20:20:27 2021 +0800
Polish compile errrors
---
.../client/consumer/DefaultMQPushConsumer.java | 2 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 5 -
.../client/impl/consumer/PullAPIWrapper.java | 350 ++-------------------
.../DefaultMQPullConsumerLogicalQueueTest.java | 248 ---------------
.../DefaultMQProducerLogicalQueueTest.java | 311 ------------------
.../client/producer/DefaultMQProducerTest.java | 28 +-
.../DefaultMQProducerWithOpenTracingTest.java | 14 +-
.../trace/DefaultMQProducerWithTraceTest.java | 26 +-
.../TransactionMQProducerWithOpenTracingTest.java | 16 +-
.../trace/TransactionMQProducerWithTraceTest.java | 40 +--
.../protocol/route/TopicRouteDataNameSrv.java | 64 ----
.../rocketmq/common/RegisterBrokerBodyTest.java | 3 +-
.../processor/DefaultRequestProcessorTest.java | 17 +-
13 files changed, 78 insertions(+), 1046 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index cf8cbb0..8a6340b 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -691,7 +691,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
+ this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel,
(String) null);
}
/**
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c07f9fa..67fd937 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.impl;
-import com.google.common.base.Function;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -180,9 +179,7 @@ import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
@@ -201,8 +198,6 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-import static com.google.common.base.Optional.fromNullable;
-
public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog();
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 6d966a6..8cd4bab 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -16,15 +16,20 @@
*/
package org.apache.rocketmq.client.impl.consumer;
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Objects;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.exception.MQRedirectException;
import org.apache.rocketmq.client.hook.FilterMessageContext;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -34,35 +39,18 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.google.common.base.Optional.fromNullable;
-
public class PullAPIWrapper {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
@@ -83,36 +71,13 @@ public class PullAPIWrapper {
public PullResult processPullResult(final MessageQueue mq, final
PullResult pullResult,
final SubscriptionData subscriptionData) {
- final PullResultExt pullResultExt = (PullResultExt) pullResult;
-
- LogicalQueueRouteData queueRouteData = null;
- PullResultWithLogicalQueues pullResultWithLogicalQueues = null;
- if (pullResultExt instanceof PullResultWithLogicalQueues) {
- pullResultWithLogicalQueues = (PullResultWithLogicalQueues)
pullResultExt;
- queueRouteData = pullResultWithLogicalQueues.getQueueRouteData();
- }
-
- if (queueRouteData != null) {
- pullResultWithLogicalQueues.setOrigPullResultExt(new
PullResultExt(pullResultExt.getPullStatus(),
-
queueRouteData.toLogicalQueueOffset(pullResultExt.getNextBeginOffset()),
-
queueRouteData.toLogicalQueueOffset(pullResultExt.getMinOffset()),
- // although this maxOffset may not belong to this queue route,
but the actual value must be a larger one, and since maxOffset here is not an
accurate value, we just do it to make things simple.
-
queueRouteData.toLogicalQueueOffset(pullResultExt.getMaxOffset()),
- pullResultExt.getMsgFoundList(),
- pullResultExt.getSuggestWhichBrokerId(),
- pullResultExt.getMessageBinary()));
- }
+ PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq,
pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer =
ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
- if (queueRouteData != null) {
- // prevent pulled data is out of current queue route, this
happens when some commit log data is cleaned in the broker but still pull from
it.
- msgList = queueRouteData.filterMessages(msgList);
- }
-
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() &&
!subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
@@ -142,10 +107,6 @@ public class PullAPIWrapper {
MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
msg.setBrokerName(mq.getBrokerName());
- msg.setQueueId(mq.getQueueId());
- if (queueRouteData != null) {
-
msg.setQueueOffset(queueRouteData.toLogicalQueueOffset(msg.getQueueOffset()));
- }
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
@@ -153,7 +114,7 @@ public class PullAPIWrapper {
pullResultExt.setMessageBinary(null);
- return pullResultExt;
+ return pullResult;
}
public void updatePullFromWhichNode(final MessageQueue mq, final long
brokerId) {
@@ -182,67 +143,24 @@ public class PullAPIWrapper {
}
public PullResult pullKernelImpl(
- MessageQueue mq,
+ final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
- long offset,
+ final long offset,
final int maxNums,
final int sysFlag,
- long commitOffset,
+ final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
- PullCallback pullCallback
+ final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
- if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName())) {
- LogicalQueueContext logicalQueueContext = new
LogicalQueueContext(mq, subExpression, expressionType, subVersion, offset,
maxNums, sysFlag, commitOffset, brokerSuspendMaxTimeMillis, timeoutMillis,
communicationMode, pullCallback);
- while (true) {
- try {
- MessageQueue messageQueue =
logicalQueueContext.getModifiedMessageQueue();
- if (messageQueue == null) {
- if (pullCallback != null) {
-
pullCallback.onSuccess(logicalQueueContext.getPullResult());
- return null;
- } else {
- return logicalQueueContext.getPullResult();
- }
- }
- PullResult pullResult =
this.pullKernelImplWithoutRetry(messageQueue, subExpression, expressionType,
subVersion, logicalQueueContext.getModifiedOffset(), maxNums, sysFlag,
logicalQueueContext.getModifiedCommitOffset(), brokerSuspendMaxTimeMillis,
timeoutMillis, communicationMode, logicalQueueContext.wrapPullCallback());
- return logicalQueueContext.wrapPullResult(pullResult);
- } catch (MQRedirectException e) {
- if (!logicalQueueContext.shouldRetry(e)) {
- throw new MQBrokerException(ResponseCode.SYSTEM_ERROR,
"redirect");
- }
- }
- }
- } else {
- return this.pullKernelImplWithoutRetry(mq, subExpression,
expressionType, subVersion, offset, maxNums, sysFlag, commitOffset,
brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, pullCallback);
- }
- }
-
- public PullResult pullKernelImplWithoutRetry(
- MessageQueue mq,
- final String subExpression,
- final String expressionType,
- final long subVersion,
- long offset,
- final int maxNums,
- final int sysFlag,
- long commitOffset,
- final long brokerSuspendMaxTimeMillis,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- PullCallback pullCallback
- ) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
- String topic = mq.getTopic();
- int queueId = mq.getQueueId();
-
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq),
this.recalculatePullFromWhichNode(mq), false);
@@ -265,8 +183,8 @@ public class PullAPIWrapper {
PullMessageRequestHeader requestHeader = new
PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
@@ -278,15 +196,17 @@ public class PullAPIWrapper {
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
- brokerAddr = computePullFromWhichFilterServer(topic,
brokerAddr);
+ brokerAddr = computePullFromWhichFilterServer(mq.getTopic(),
brokerAddr);
}
- return this.mQClientFactory.getMQClientAPIImpl().pullMessage(
+ PullResult pullResult =
this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
+
+ return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "]
not exist", null);
@@ -403,230 +323,4 @@ public class PullAPIWrapper {
throw new MQClientException("The broker[" + mq.getBrokerName() + "]
not exist", null);
}
- private class LogicalQueueContext implements PullCallback {
- private final MessageQueue mq;
- private final String subExpression;
- private final String expressionType;
- private final long subVersion;
- private final long offset;
- private final int maxNums;
- private final int sysFlag;
- private final long commitOffset;
- private final long brokerSuspendMaxTimeMillis;
- private final long timeoutMillis;
- private final CommunicationMode communicationMode;
- private final PullCallback pullCallback;
-
- private volatile LogicalQueuesInfo logicalQueuesInfo;
- private volatile LogicalQueueRouteData logicalQueueRouteData;
- private volatile boolean isMaxReadableQueueRoute;
-
- private volatile PullResultExt pullResult = null;
-
- private final AtomicInteger retry = new AtomicInteger();
-
- public LogicalQueueContext(MessageQueue mq, String subExpression,
String expressionType, long subVersion,
- long offset, int maxNums, int sysFlag, long commitOffset, long
brokerSuspendMaxTimeMillis,
- long timeoutMillis, CommunicationMode communicationMode,
- PullCallback pullCallback) {
- this.mq = mq;
- this.subExpression = subExpression;
- this.expressionType = expressionType;
- this.subVersion = subVersion;
- this.offset = offset;
- this.maxNums = maxNums;
- this.sysFlag = sysFlag;
- this.commitOffset = commitOffset;
- this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
- this.timeoutMillis = timeoutMillis;
- this.communicationMode = communicationMode;
- this.pullCallback = pullCallback;
-
- this.buildLogicalQueuesInfo();
- }
-
- private boolean notUsingLogicalQueue() {
- return !Objects.equal(mq.getBrokerName(),
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME) || this.logicalQueuesInfo == null;
- }
-
- private void buildLogicalQueuesInfo() {
- TopicRouteData topicRouteData =
PullAPIWrapper.this.mQClientFactory.queryTopicRouteData(mq.getTopic());
- if (topicRouteData != null) {
- //TODO
- //this.logicalQueuesInfo =
topicRouteData.getLogicalQueuesInfo();
- }
- }
-
- @Override public void onSuccess(PullResult pullResult) {
- this.pullCallback.onSuccess(this.wrapPullResult(pullResult));
- }
-
- @Override public void onException(Throwable t) {
- if (!this.shouldRetry(t)) {
- this.pullCallback.onException(t);
- return;
- }
- MessageQueue messageQueue = this.getModifiedMessageQueue();
- if (messageQueue == null) {
- this.pullCallback.onSuccess(this.getPullResult());
- return;
- }
- try {
- PullAPIWrapper.this.pullKernelImplWithoutRetry(messageQueue,
subExpression, expressionType, subVersion, this.getModifiedOffset(), maxNums,
sysFlag, this.getModifiedCommitOffset(), brokerSuspendMaxTimeMillis,
timeoutMillis, communicationMode, this);
- } catch (Exception e) {
- this.pullCallback.onException(e);
- }
- }
-
- public MessageQueue getModifiedMessageQueue() {
- if (this.notUsingLogicalQueue()) {
- return this.mq;
- }
- this.logicalQueuesInfo.readLock().lock();
- try {
- List<LogicalQueueRouteData> queueRouteDataList =
fromNullable(this.logicalQueuesInfo.get(this.mq.getQueueId())).or(Collections.<LogicalQueueRouteData>emptyList());
- LogicalQueueRouteData searchKey = new LogicalQueueRouteData();
- searchKey.setState(MessageQueueRouteState.Normal);
- searchKey.setLogicalQueueDelta(offset);
- // it's sorted after getTopicRouteInfoFromNameServer
- int startIdx = Collections.binarySearch(queueRouteDataList,
searchKey);
- if (startIdx < 0) {
- startIdx = -startIdx - 1;
- // lower entry
- startIdx -= 1;
- }
- this.logicalQueueRouteData = null;
- this.pullResult = null;
- LogicalQueueRouteData lastReadableLogicalQueueRouteData =
null; // first item which delta > offset
- LogicalQueueRouteData minReadableLogicalQueueRouteData = null;
- LogicalQueueRouteData maxReadableLogicalQueueRouteData = null;
- for (int i = 0, size = queueRouteDataList.size(); i < size;
i++) {
- LogicalQueueRouteData queueRouteData =
queueRouteDataList.get(i);
- if (!queueRouteData.isReadable()) {
- continue;
- }
- maxReadableLogicalQueueRouteData = queueRouteData;
- if (minReadableLogicalQueueRouteData == null) {
- minReadableLogicalQueueRouteData = queueRouteData;
- if (i < startIdx) {
- // must consider following `i++` operation when
invoke `continue`, so decrease first
- i = startIdx - 1;
- continue;
- }
- }
- if (queueRouteData.getLogicalQueueDelta() > offset) {
- if (this.logicalQueueRouteData != null) {
- if
(this.logicalQueueRouteData.toLogicalQueueOffset(this.logicalQueueRouteData.getOffsetMax())
<= offset) {
- this.logicalQueueRouteData = queueRouteData;
- }
- break;
- } else {
- if (lastReadableLogicalQueueRouteData == null) {
- lastReadableLogicalQueueRouteData =
queueRouteData;
- }
- }
- } else {
- this.logicalQueueRouteData = queueRouteData;
- }
- }
- if (this.logicalQueueRouteData == null) {
- if (lastReadableLogicalQueueRouteData != null) {
- this.pullResult = new
PullResultExt(PullStatus.OFFSET_ILLEGAL,
lastReadableLogicalQueueRouteData.getLogicalQueueDelta(),
minReadableLogicalQueueRouteData.getLogicalQueueDelta(),
maxReadableLogicalQueueRouteData.getLogicalQueueDelta(), null, 0, null);
- return null;
- } else {
- if (maxReadableLogicalQueueRouteData != null) {
- this.logicalQueueRouteData =
maxReadableLogicalQueueRouteData;
- } else {
- if (!queueRouteDataList.isEmpty()) {
- this.logicalQueueRouteData =
queueRouteDataList.get(queueRouteDataList.size() - 1);
- } else {
- pullResult = new
PullResultExt(PullStatus.NO_NEW_MSG, 0, 0, 0, null, 0, null);
- return null;
- }
- }
- }
- }
- this.isMaxReadableQueueRoute =
this.logicalQueueRouteData.isSameTo(maxReadableLogicalQueueRouteData);
- return this.logicalQueueRouteData.getMessageQueue();
- } finally {
- this.logicalQueuesInfo.readLock().unlock();
- }
- }
-
- public PullResultExt getPullResult() {
- return pullResult;
- }
-
- public PullCallback wrapPullCallback() {
- if (this.notUsingLogicalQueue()) {
- return this.pullCallback;
- }
- if (!CommunicationMode.ASYNC.equals(this.communicationMode)) {
- return this.pullCallback;
- }
- return this;
- }
-
- public long getModifiedOffset() {
- return
this.logicalQueueRouteData.toMessageQueueOffset(this.offset);
- }
-
- public long getModifiedCommitOffset() {
- // TODO should this be modified too? If offset is not in current
broker's range, how do we handle it?
- return this.commitOffset;
- }
-
- public void incrRetry() {
- this.retry.incrementAndGet();
- }
-
- public boolean shouldRetry(Throwable t) {
- this.incrRetry();
- if (this.retry.get() >= 3) {
- return false;
- }
- if (t instanceof MQRedirectException) {
- MQRedirectException e = (MQRedirectException) t;
- this.processResponseBody(e.getBody());
- return true;
- }
- return false;
- }
-
- public PullResult wrapPullResult(PullResult pullResult) {
- if (pullResult == null) {
- return null;
- }
- if (this.logicalQueueRouteData == null) {
- return pullResult;
- }
- if (!this.isMaxReadableQueueRoute &&
PullStatus.NO_MATCHED_MSG.equals(pullResult.getPullStatus())) {
- PullStatus status = PullStatus.OFFSET_ILLEGAL;
- if (pullResult instanceof PullResultExt) {
- PullResultExt pullResultExt = (PullResultExt) pullResult;
- pullResult = new PullResultExt(status,
pullResultExt.getNextBeginOffset(), pullResultExt.getMinOffset(),
pullResultExt.getMaxOffset(), pullResultExt.getMsgFoundList(),
pullResultExt.getSuggestWhichBrokerId(), pullResultExt.getMessageBinary());
- } else {
- pullResult = new PullResult(status,
pullResult.getNextBeginOffset(), pullResult.getMinOffset(),
pullResult.getMaxOffset(), pullResult.getMsgFoundList());
- }
- }
- // method PullAPIWrapper#processPullResult will modify
queueOffset/nextBeginOffset/minOffset/maxOffset
- return new PullResultWithLogicalQueues(pullResult,
this.logicalQueueRouteData);
- }
-
- public void processResponseBody(byte[] responseBody) {
- log.info("LogicalQueueContext.processResponseBody got redirect {}:
{}", this.logicalQueueRouteData, responseBody != null ? new
String(responseBody, MessageDecoder.CHARSET_UTF8) : null);
- if (responseBody != null) {
- try {
- List<LogicalQueueRouteData> queueRouteDataList =
JSON.parseObject(responseBody, MixAll.TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA);
-
this.logicalQueuesInfo.updateLogicalQueueRouteDataList(this.mq.getQueueId(),
queueRouteDataList);
- return;
- } catch (Exception e) {
- log.warn("LogicalQueueContext.processResponseBody {}
update exception, fallback to updateTopicRouteInfoFromNameServer",
this.logicalQueueRouteData, e);
- }
- }
- //TODO
-
//PullAPIWrapper.this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic(),
false, null, Collections.singleton(this.mq.getQueueId()));
- this.buildLogicalQueuesInfo();
- }
- }
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java
deleted file mode 100644
index 15ec564..0000000
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.consumer;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQRedirectException;
-import org.apache.rocketmq.client.impl.CommunicationMode;
-import org.apache.rocketmq.client.impl.FindBrokerResult;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
-import org.apache.rocketmq.client.impl.consumer.PullResultExt;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.assertj.core.util.Lists;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class DefaultMQPullConsumerLogicalQueueTest {
- private MQClientInstance mQClientFactory;
- @Mock
- private MQClientAPIImpl mQClientAPIImpl;
- private DefaultMQPullConsumer pullConsumer;
- private String topic;
- private static final String cluster = "DefaultCluster";
- private static final String broker1Name = "BrokerA";
- private static final String broker1Addr = "127.0.0.2:10911";
- private static final String broker2Name = "BrokerB";
- private static final String broker2Addr = "127.0.0.3:10911";
-
- @Before
- public void init() throws Exception {
- topic = "FooBar" + System.nanoTime();
-
- mQClientFactory =
spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new
ClientConfig()));
-
- FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl",
mQClientAPIImpl, true);
-
- pullConsumer = new DefaultMQPullConsumer("FooBarGroup" +
System.nanoTime());
- pullConsumer.setNamesrvAddr("127.0.0.1:9876");
- pullConsumer.start();
-
- PullAPIWrapper pullAPIWrapper =
pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper();
- FieldUtils.writeDeclaredField(pullAPIWrapper, "mQClientFactory",
mQClientFactory, true);
-
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRouteData());
-
- doReturn(new FindBrokerResult(broker1Addr,
false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker1Name),
anyLong(), anyBoolean());
- doReturn(new FindBrokerResult(broker2Addr,
false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker2Name),
anyLong(), anyBoolean());
- }
-
- @After
- public void terminate() {
- pullConsumer.shutdown();
- }
-
- @Test
- public void testStart_OffsetShouldNotNUllAfterStart() {
- Assert.assertNotNull(pullConsumer.getOffsetStore());
- }
-
- @Test
- public void testPullMessage_Success() throws Exception {
- doAnswer(new Answer<PullResultExt>() {
- @Override public PullResultExt answer(InvocationOnMock mock)
throws Throwable {
- PullMessageRequestHeader requestHeader = mock.getArgument(1);
- return
DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader,
PullStatus.FOUND, Collections.singletonList(new MessageExt()));
- }
- }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr),
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
(PullCallback) isNull());
-
- MessageQueue messageQueue = new MessageQueue(topic,
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
- PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
- assertThat(pullResult).isNotNull();
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
- assertThat(pullResult.getMinOffset()).isEqualTo(123);
- assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
-
assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
- }
-
- @Test
- public void testPullMessage_NotFound() throws Exception {
- doAnswer(new Answer<PullResult>() {
- @Override public PullResult answer(InvocationOnMock mock) throws
Throwable {
- PullMessageRequestHeader requestHeader = mock.getArgument(1);
- return
DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader,
PullStatus.NO_NEW_MSG, new ArrayList<MessageExt>());
- }
- }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr),
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
(PullCallback) isNull());
-
- MessageQueue messageQueue = new MessageQueue(topic,
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
- PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
-
assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
- }
-
- @Test
- public void testPullMessageAsync_Success() throws Exception {
- doAnswer(new Answer<PullResult>() {
- @Override public PullResult answer(InvocationOnMock mock) throws
Throwable {
- PullMessageRequestHeader requestHeader = mock.getArgument(1);
- PullResult pullResult =
DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader,
PullStatus.FOUND, Collections.singletonList(new MessageExt()));
-
- PullCallback pullCallback = mock.getArgument(4);
- pullCallback.onSuccess(pullResult);
- return null;
- }
- }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr),
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.ASYNC),
any(PullCallback.class));
-
- final SettableFuture<PullResult> future = SettableFuture.create();
- MessageQueue messageQueue = new MessageQueue(topic, broker1Name, 0);
- pullConsumer.pull(messageQueue, "*", 1024, 3, new PullCallback() {
- @Override
- public void onSuccess(PullResult pullResult) {
- future.set(pullResult);
- }
-
- @Override
- public void onException(Throwable e) {
- future.setException(e);
- }
- });
- PullResult pullResult = future.get(3, TimeUnit.SECONDS);
- assertThat(pullResult).isNotNull();
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
- assertThat(pullResult.getMinOffset()).isEqualTo(123);
- assertThat(pullResult.getMaxOffset()).isEqualTo(2048);
-
assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
- }
-
- @Test
- public void testPullMessageSync_Redirect() throws Exception {
- doAnswer(new Answer<PullResult>() {
- @Override public PullResult answer(InvocationOnMock mock) throws
Throwable {
- throw new
MQRedirectException(JSON.toJSONBytes(ImmutableList.of(
- new LogicalQueueRouteData(0, 0, new MessageQueue(topic,
broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr),
- new LogicalQueueRouteData(0, 10, new MessageQueue(topic,
broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)
- )));
- }
- }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr),
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
(PullCallback) isNull());
- doAnswer(new Answer<PullResult>() {
- @Override public PullResult answer(InvocationOnMock mock) throws
Throwable {
- PullMessageRequestHeader requestHeader = mock.getArgument(1);
- return
DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader,
PullStatus.FOUND, Collections.singletonList(new MessageExt()));
- }
- }).when(mQClientAPIImpl).pullMessage(eq(broker2Addr),
any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC),
(PullCallback) isNull());
-
- MessageQueue messageQueue = new MessageQueue(topic,
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
- PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3);
- assertThat(pullResult).isNotNull();
- assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
- assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1);
- assertThat(pullResult.getMinOffset()).isEqualTo(123 + 10);
- assertThat(pullResult.getMaxOffset()).isEqualTo(2048 + 10);
-
assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList());
- }
-
- private TopicRouteData createTopicRouteData() {
- TopicRouteData topicRouteData = new TopicRouteData();
-
- topicRouteData.setFilterServerTable(new HashMap<String,
List<String>>());
- topicRouteData.setBrokerDatas(ImmutableList.of(
- new BrokerData(cluster, broker1Name, new HashMap<Long,
String>(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))),
- new BrokerData(cluster, broker2Name, new HashMap<Long,
String>(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr)))
- ));
-
- List<QueueData> queueDataList = new ArrayList<QueueData>();
- QueueData queueData;
- queueData = new QueueData();
- queueData.setBrokerName(broker1Name);
- queueData.setPerm(6);
- queueData.setReadQueueNums(3);
- queueData.setWriteQueueNums(4);
- queueData.setTopicSysFlag(0);
- queueDataList.add(queueData);
- queueData = new QueueData();
- queueData.setBrokerName(broker2Name);
- queueData.setPerm(6);
- queueData.setReadQueueNums(3);
- queueData.setWriteQueueNums(4);
- queueData.setTopicSysFlag(0);
- queueDataList.add(queueData);
- topicRouteData.setQueueDatas(queueDataList);
-
- LogicalQueuesInfo info = new LogicalQueuesInfo();
- info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new
MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0,
broker1Addr)));
- topicRouteData.setLogicalQueuesInfo(info);
- return topicRouteData;
- }
-
- private PullResultExt createPullResult(PullMessageRequestHeader
requestHeader, PullStatus pullStatus,
- List<MessageExt> messageExtList) throws Exception {
- return new PullResultExt(pullStatus, requestHeader.getQueueOffset() +
messageExtList.size(), 123, 2048, messageExtList, 0, new byte[] {});
- }
-}
\ No newline at end of file
diff --git
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java
deleted file mode 100644
index 12d5cba..0000000
---
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.producer;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.exception.MQRedirectException;
-import org.apache.rocketmq.client.hook.SendMessageContext;
-import org.apache.rocketmq.client.impl.CommunicationMode;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.assertj.core.api.ThrowableAssert;
-import org.assertj.core.util.Lists;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class DefaultMQProducerLogicalQueueTest {
- private MQClientInstance mQClientFactory;
- @Mock
- private MQClientAPIImpl mQClientAPIImpl;
-
- private DefaultMQProducer producer;
- private Message message;
- private String topic;
-
- private MessageQueue messageQueue;
-
- private static final String cluster = "DefaultCluster";
- private static final String broker1Name = "broker1";
- private static final String broker2Name = "broker2";
- private static final String broker1Addr = "127.0.0.2:10911";
- private static final String broker2Addr = "127.0.0.3:10911";
-
- @Before
- public void init() throws Exception {
- topic = "Foobar" + System.nanoTime();
- messageQueue = new MessageQueue(topic,
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0);
-
- ConcurrentMap<String, MQClientInstance> factoryTable =
(ConcurrentMap<String/* clientId */, MQClientInstance>)
FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable",
true);
- for (MQClientInstance instance : factoryTable.values()) {
- instance.shutdown();
- }
- factoryTable.clear();
-
- mQClientFactory =
spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new
ClientConfig()));
- factoryTable.put(new ClientConfig().buildMQClientId(),
mQClientFactory);
-
- String producerGroupTemp = "FooBar_PID" + System.nanoTime();
- producer = new DefaultMQProducer(producerGroupTemp);
- producer.setNamesrvAddr("127.0.0.1:9876");
- producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
- message = new Message(topic, new byte[] {'a'});
-
- mQClientFactory.registerProducer(producerGroupTemp,
producer.getDefaultMQProducerImpl());
-
- producer.start();
-
- FieldUtils.writeDeclaredField(producer.getDefaultMQProducerImpl(),
"mQClientFactory", mQClientFactory, true);
- FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl",
mQClientAPIImpl, true);
-
- when(mQClientAPIImpl.sendMessage(anyString(), anyString(),
any(Message.class), any(SendMessageRequestHeader.class), anyLong(),
any(CommunicationMode.class),
- nullable(SendMessageContext.class),
any(DefaultMQProducerImpl.class))).thenCallRealMethod();
- when(mQClientAPIImpl.sendMessage(anyString(), anyString(),
any(Message.class), any(SendMessageRequestHeader.class), anyLong(),
any(CommunicationMode.class),
- (SendCallback) isNull(), nullable(TopicPublishInfo.class),
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class),
any(DefaultMQProducerImpl.class)))
- .thenReturn(createSendResult(SendStatus.SEND_OK));
- when(mQClientAPIImpl.sendMessage(anyString(), anyString(),
any(Message.class), any(SendMessageRequestHeader.class), anyLong(),
any(CommunicationMode.class),
- any(SendCallback.class), nullable(TopicPublishInfo.class),
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class),
any(DefaultMQProducerImpl.class)))
- .thenAnswer(new Answer<SendResult>() {
- @Override public SendResult answer(InvocationOnMock
invocation) throws Throwable {
- SendCallback sendCallback = invocation.getArgument(6);
-
sendCallback.onSuccess(DefaultMQProducerLogicalQueueTest.this.createSendResult(SendStatus.SEND_OK));
- return null;
- }
- });
- }
-
- @After
- public void terminate() {
- producer.shutdown();
- }
-
- @Test
- public void testSendMessageSync_Success() throws Exception {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
- SendResult sendResult = producer.send(message, messageQueue);
-
- assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
- assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
- assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
- }
-
- @Test
- public void testSendMessageSync_Redirect() throws Exception {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
-
- when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name),
any(Message.class), any(SendMessageRequestHeader.class), anyLong(),
eq(CommunicationMode.SYNC),
- (SendCallback) isNull(), nullable(TopicPublishInfo.class),
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class),
any(DefaultMQProducerImpl.class)))
- .thenThrow(new MQRedirectException(null));
-
- assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
- @Override public void call() throws Throwable {
- producer.send(message, messageQueue);
- }
-
}).isInstanceOf(MQBrokerException.class).hasMessageContaining("redirect");
-
- when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name),
any(Message.class), any(SendMessageRequestHeader.class), anyLong(),
eq(CommunicationMode.SYNC),
- (SendCallback) isNull(), nullable(TopicPublishInfo.class),
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class),
any(DefaultMQProducerImpl.class)))
- .thenThrow(new
MQRedirectException(JSON.toJSONBytes(ImmutableList.of(
- new LogicalQueueRouteData(0, 0, new MessageQueue(topic,
broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr),
- new LogicalQueueRouteData(0, 10, new MessageQueue(topic,
broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)))));
- when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name),
any(Message.class), any(SendMessageRequestHeader.class), anyLong(),
eq(CommunicationMode.SYNC),
- (SendCallback) isNull(), nullable(TopicPublishInfo.class),
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class),
any(DefaultMQProducerImpl.class)))
- .thenReturn(createSendResult(SendStatus.SEND_OK));
-
- SendResult sendResult = producer.send(message, messageQueue);
- assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
- assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
- assertThat(sendResult.getQueueOffset()).isEqualTo(466L);
- }
-
- @Test
- public void testSendMessageSync_RemotingException() throws Exception {
- TopicRouteData topicRouteData = createTopicRoute();
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(topicRouteData);
-
- when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name),
any(Message.class), any(SendMessageRequestHeader.class), anyLong(),
eq(CommunicationMode.SYNC),
- (SendCallback) isNull(), nullable(TopicPublishInfo.class),
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class),
any(DefaultMQProducerImpl.class)))
- .thenThrow(new RemotingConnectException(broker1Addr));
- SendResult returnSendResult = createSendResult(SendStatus.SEND_OK);
- when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name),
any(Message.class), any(SendMessageRequestHeader.class), anyLong(),
eq(CommunicationMode.SYNC),
- (SendCallback) isNull(), nullable(TopicPublishInfo.class),
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class),
any(DefaultMQProducerImpl.class)))
- .thenReturn(returnSendResult);
-
- assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
- @Override public void call() throws Throwable {
- producer.send(message, messageQueue);
- }
-
}).isInstanceOf(RemotingConnectException.class).hasMessageContaining(broker1Addr);
-
- topicRouteData.getLogicalQueuesInfo().get(0).add(new
LogicalQueueRouteData(0, -1, new MessageQueue(topic, broker2Name, 1),
MessageQueueRouteState.WriteOnly, 0, -1, -1, -1, broker2Addr));
-
- SendResult sendResult = producer.send(message, messageQueue);
- assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
- assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
- assertThat(sendResult.getQueueOffset()).isEqualTo(-1L);
- }
-
- @Test
- public void testSendMessageAsync_Success() throws Exception {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
-
- final SettableFuture<SendResult> future = SettableFuture.create();
- producer.send(message, messageQueue, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- future.set(sendResult);
- }
-
- @Override
- public void onException(Throwable e) {
- future.setException(e);
- }
- });
-
- SendResult sendResult = future.get(3, TimeUnit.SECONDS);
- assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
- assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
- assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
- }
-
- @Test
- public void testSendMessageAsync() throws Exception {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
-
- final AtomicReference<SettableFuture<SendResult>> future = new
AtomicReference<SettableFuture<SendResult>>();
- SendCallback sendCallback = new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- future.get().set(sendResult);
- }
-
- @Override
- public void onException(Throwable e) {
- future.get().setException(e);
- }
- };
-
- Message message = new Message();
- message.setTopic("test");
- message.setBody("hello world".getBytes());
- future.set(SettableFuture.<SendResult>create());
- producer.send(new Message(), messageQueue, sendCallback);
- assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
- @Override public void call() throws Throwable {
- future.get().get(3, TimeUnit.SECONDS);
- }
-
}).hasCauseInstanceOf(MQClientException.class).hasMessageContaining("The
specified topic is blank");
-
- //this message is send success
- message.setTopic(topic);
- future.set(SettableFuture.<SendResult>create());
- producer.send(message, messageQueue, sendCallback, 1000);
- future.get().get(3, TimeUnit.SECONDS);
- }
-
- public TopicRouteData createTopicRoute() {
- TopicRouteData topicRouteData = new TopicRouteData();
-
- topicRouteData.setFilterServerTable(new HashMap<String,
List<String>>());
- topicRouteData.setBrokerDatas(ImmutableList.of(
- new BrokerData(cluster, broker1Name, new HashMap<Long,
String>(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))),
- new BrokerData(cluster, broker2Name, new HashMap<Long,
String>(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr)))
- ));
-
- List<QueueData> queueDataList = new ArrayList<QueueData>();
- QueueData queueData;
- queueData = new QueueData();
- queueData.setBrokerName(broker1Name);
- queueData.setPerm(6);
- queueData.setReadQueueNums(3);
- queueData.setWriteQueueNums(4);
- queueData.setTopicSysFlag(0);
- queueDataList.add(queueData);
- queueData = new QueueData();
- queueData.setBrokerName(broker2Name);
- queueData.setPerm(6);
- queueData.setReadQueueNums(3);
- queueData.setWriteQueueNums(4);
- queueData.setTopicSysFlag(0);
- queueDataList.add(queueData);
- topicRouteData.setQueueDatas(queueDataList);
-
- LogicalQueuesInfo info = new LogicalQueuesInfo();
- info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new
MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0,
broker1Addr)));
- topicRouteData.setLogicalQueuesInfo(info);
- return topicRouteData;
- }
-
- private SendResult createSendResult(SendStatus sendStatus) {
- SendResult sendResult = new SendResult();
- sendResult.setMsgId("123");
- sendResult.setOffsetMsgId("123");
- sendResult.setQueueOffset(456);
- sendResult.setSendStatus(sendStatus);
- sendResult.setRegionId("HZ");
- sendResult.setMessageQueue(new MessageQueue(topic, broker1Name, 0));
- return sendResult;
- }
-}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index a8906b3..5f29fe1 100644
---
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -49,20 +48,21 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -153,7 +153,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageSync_Success() throws RemotingException,
InterruptedException, MQBrokerException, MQClientException {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
SendResult sendResult = producer.send(message);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
@@ -163,7 +163,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageSync_WithBodyCompressed() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
SendResult sendResult = producer.send(bigMessage);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
@@ -174,7 +174,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageAsync_Success() throws RemotingException,
InterruptedException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
@@ -197,7 +197,7 @@ public class DefaultMQProducerTest {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(6);
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
@@ -239,7 +239,7 @@ public class DefaultMQProducerTest {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(4);
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
@@ -260,7 +260,7 @@ public class DefaultMQProducerTest {
}
};
- List<Message> msgs = new ArrayList<Message>();
+ List<Message> msgs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Message message = new Message();
message.setTopic("test");
@@ -281,7 +281,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageAsync_BodyCompressed() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
producer.send(bigMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
@@ -300,7 +300,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageSync_SuccessWithHook() throws Throwable {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
final Throwable[] assertionErrors = new Throwable[1];
final CountDownLatch countDownLatch = new CountDownLatch(2);
producer.getDefaultMQProducerImpl().registerSendMessageHook(new
SendMessageHook() {
@@ -368,7 +368,7 @@ public class DefaultMQProducerTest {
@Test
public void testRequestMessage() throws RemotingException,
RequestTimeoutException, MQClientException, InterruptedException,
MQBrokerException {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
final AtomicBoolean finish = new AtomicBoolean(false);
new Thread(new Runnable() {
@Override public void run() {
@@ -394,13 +394,13 @@ public class DefaultMQProducerTest {
@Test(expected = RequestTimeoutException.class)
public void testRequestMessage_RequestTimeoutException() throws
RemotingException, RequestTimeoutException, MQClientException,
InterruptedException, MQBrokerException {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
Message result = producer.request(message, 3 * 1000L);
}
@Test
public void testAsyncRequest_OnSuccess() throws Exception {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
RequestCallback requestCallback = new RequestCallback() {
@Override public void onSuccess(Message message) {
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
index 0a1f685..5d64a93 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
@@ -20,11 +20,6 @@ package org.apache.rocketmq.client.trace;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,14 +47,17 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -115,7 +113,7 @@ public class DefaultMQProducerWithOpenTracingTest {
@Test
public void testSendMessageSync_WithTrace_Success() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp,
producer.getDefaultMQProducerImpl());
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
producer.send(message);
assertThat(tracer.finishedSpans().size()).isEqualTo(1);
MockSpan span = tracer.finishedSpans().get(0);
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index 64f63c5..234e32e 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -17,13 +17,6 @@
package org.apache.rocketmq.client.trace;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -50,17 +43,18 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.nullable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -128,7 +122,7 @@ public class DefaultMQProducerWithTraceTest {
@Test
public void testSendMessageSync_WithTrace_Success() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp,
traceProducer.getDefaultMQProducerImpl());
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
producer.send(message);
@@ -140,7 +134,7 @@ public class DefaultMQProducerWithTraceTest {
@Test
public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
producer.send(message);
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
index aca6254..dd6d108 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
@@ -20,12 +20,6 @@ package org.apache.rocketmq.client.trace;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -59,14 +53,18 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -133,7 +131,7 @@ public class TransactionMQProducerWithOpenTracingTest {
@Test
public void testSendMessageSync_WithTrace_Success() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp,
producer.getDefaultMQProducerImpl());
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
producer.sendMessageInTransaction(message, null);
assertThat(tracer.finishedSpans().size()).isEqualTo(2);
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
index b3a4414..f838817 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
@@ -17,13 +17,6 @@
package org.apache.rocketmq.client.trace;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -57,20 +50,19 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@@ -135,7 +127,7 @@ public class TransactionMQProducerWithTraceTest {
Field fieldHooks =
DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
fieldHooks.setAccessible(true);
- List<EndTransactionHook>hooks = new ArrayList<EndTransactionHook>();
+ List<EndTransactionHook>hooks = new ArrayList<>();
hooks.add(endTransactionHook);
fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks);
@@ -150,14 +142,12 @@ public class TransactionMQProducerWithTraceTest {
@Test
public void testSendMessageSync_WithTrace_Success() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp,
traceProducer.getDefaultMQProducerImpl());
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong(), anyBoolean(),
ArgumentMatchers.<Set<Integer>>any())).thenReturn(createTopicRoute());
- final AtomicReference<EndTransactionContext> context = new
AtomicReference<EndTransactionContext>();
- doAnswer(new Answer() {
- @Override public Object answer(InvocationOnMock mock) throws
Throwable {
- context.set(mock.<EndTransactionContext>getArgument(0));
- return null;
- }
-
}).when(endTransactionHook).endTransaction(ArgumentMatchers.<EndTransactionContext>any());
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
+ AtomicReference<EndTransactionContext> context = new
AtomicReference<>();
+ doAnswer(mock -> {
+ context.set(mock.getArgument(0));
+ return null;
+ }).when(endTransactionHook).endTransaction(any());
producer.sendMessageInTransaction(message, null);
EndTransactionContext ctx = context.get();
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java
deleted file mode 100644
index e9fb84e..0000000
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.common.protocol.route;
-
-import com.google.common.base.Objects;
-
-public class TopicRouteDataNameSrv extends TopicRouteData {
- private LogicalQueuesInfoUnordered logicalQueuesInfoUnordered;
-
- public TopicRouteDataNameSrv() {
- }
-
- public LogicalQueuesInfoUnordered getLogicalQueuesInfoUnordered() {
- return logicalQueuesInfoUnordered;
- }
-
- public void setLogicalQueuesInfoUnordered(
- LogicalQueuesInfoUnordered logicalQueuesInfoUnordered) {
- this.logicalQueuesInfoUnordered = logicalQueuesInfoUnordered;
- }
-
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- if (!super.equals(o))
- return false;
- TopicRouteDataNameSrv srv = (TopicRouteDataNameSrv) o;
- return Objects.equal(logicalQueuesInfoUnordered,
srv.logicalQueuesInfoUnordered);
- }
-
- @Override public int hashCode() {
- return Objects.hashCode(super.hashCode(), logicalQueuesInfoUnordered);
- }
-
- @Override public String toString() {
- return "TopicRouteDataNameSrv{" +
- "logicalQueuesInfoUnordered=" + logicalQueuesInfoUnordered +
- "} " + super.toString();
- }
-
- public TopicRouteData toTopicRouteData() {
- TopicRouteData topicRouteData = new TopicRouteData(this);
- if (this.logicalQueuesInfoUnordered != null) {
-
topicRouteData.setLogicalQueuesInfo(this.logicalQueuesInfoUnordered.toLogicalQueuesInfoOrdered());
- }
- return topicRouteData;
- }
-}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
index 87a0fc0..428a928 100644
---
a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
+++
b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
+import
org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
@@ -29,7 +30,7 @@ public class RegisterBrokerBodyTest {
@Test
public void test_encode_decode() throws IOException {
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
- TopicConfigSerializeWrapper topicConfigSerializeWrapper = new
TopicConfigSerializeWrapper();
+ TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper =
new TopicConfigAndMappingSerializeWrapper();
registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
ConcurrentMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<String, TopicConfig>();
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
index 38bff74..e80dec7 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
@@ -16,42 +16,27 @@
*/
package org.apache.rocketmq.namesrv.processor;
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.MixAll;
+
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import
org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
-import
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
-import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
-import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
-import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;