This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 5e04a27 Finish the rewrite topic for pull and send process
5e04a27 is described below
commit 5e04a2782f3621ca4ff97711586f5826068db71f
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 10 20:13:17 2021 +0800
Finish the rewrite topic for pull and send process
---
.../apache/rocketmq/broker/BrokerController.java | 2 +-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 30 ++-
.../broker/processor/PullMessageProcessor.java | 208 ++++++++++++++++++---
.../broker/processor/SendMessageProcessor.java | 74 +++++---
.../rocketmq/common/LogicQueueMappingItem.java | 38 +++-
.../rocketmq/common/TopicQueueMappingContext.java | 73 ++++++++
.../rocketmq/common/TopicQueueMappingDetail.java | 47 +++--
.../protocol/header/PullMessageRequestHeader.java | 4 +-
.../protocol/header/PullMessageResponseHeader.java | 11 ++
.../protocol/header/SendMessageRequestHeader.java | 4 +-
.../apache/rocketmq/remoting/RequestHeader.java | 29 +++
11 files changed, 444 insertions(+), 76 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index e08dbbe..03fd661 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -236,7 +236,7 @@ public class BrokerController {
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
- this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
+ this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig, this);
this.filterServerManager = new FilterServerManager(this);
this.assignmentManager = new AssignmentManager(this);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index a775864..7f3ce81 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -23,8 +23,12 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
+import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
+import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -41,6 +45,8 @@ import
org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import
org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
@@ -66,18 +72,20 @@ import
org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class BrokerOuterAPI {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final RemotingClient remotingClient;
+ private final BrokerController brokerController;
private final TopAddressing topAddressing = new
TopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new
ThreadFactoryImpl("brokerOutApi_thread_", true));
- public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
- this(nettyClientConfig, null);
+ public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, final
BrokerController brokerController) {
+ this(nettyClientConfig, null, brokerController);
}
- public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook
rpcHook) {
+ public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook
rpcHook, final BrokerController brokerController) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.remotingClient.registerRPCHook(rpcHook);
+ this.brokerController = brokerController;
}
public void start() {
@@ -454,4 +462,20 @@ public class BrokerOuterAPI {
public void forwardRequest(String brokerAddr, RemotingCommand request,
long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException,
RemotingSendRequestException, RemotingTimeoutException,
RemotingTooMuchRequestException, RemotingConnectException {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis,
invokeCallback);
}
+
+ public RemotingCommand pullMessage(String brokerName,
PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception {
+
+ String addr = this.brokerController.getBrokerAddrByName(brokerName);
+ if (addr == null) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("%s-%d cannot find addr when
forward to broker %s in broker %s", requestHeader.getTopic(),
requestHeader.getQueueId(), brokerName,
this.brokerController.getBrokerConfig().getBrokerName()));
+ return response;
+ }
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+ RemotingCommand response = this.remotingClient.invokeSync(addr,
request, timeoutMillis);
+ assert response != null;
+ return response;
+
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index e61ef11..5ab3c01 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import java.nio.ByteBuffer;
import java.util.List;
+
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
@@ -35,9 +36,15 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.TopicQueueMappingContext;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
@@ -49,6 +56,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
@@ -95,6 +104,132 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
return false;
}
+
+ private RemotingCommand buildErrorResponse(int code, String remark) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+ response.setCode(code);
+ response.setRemark(remark);
+ return response;
+ }
+
+ private TopicQueueMappingContext
buildTopicQueueMappingContext(PullMessageRequestHeader requestHeader) {
+ if (requestHeader.getPhysical() != null
+ && Boolean.TRUE.equals(requestHeader.getPhysical())) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail =
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+ if (mappingDetail == null) {
+ //it is not static topic
+ return null;
+ }
+ String topic = requestHeader.getTopic();
+ Integer globalId = requestHeader.getQueueId();
+ Long globalOffset = requestHeader.getQueueOffset();
+
+ LogicQueueMappingItem mappingItem =
mappingDetail.getLogicQueueMappingItem(globalId, globalOffset);
+ return new TopicQueueMappingContext(topic, globalId, globalOffset,
mappingDetail, mappingItem);
+ }
+
+ private RemotingCommand
rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader,
TopicQueueMappingContext mappingContext) {
+ try {
+ if (mappingContext == null) {
+ return null;
+ }
+ String topic = mappingContext.getTopic();
+ Integer globalId = mappingContext.getGlobalId();
+ Long globalOffset = mappingContext.getGlobalOffset();
+
+ LogicQueueMappingItem mappingItem =
mappingContext.getMappingItem();
+ if (mappingItem == null) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d cannot find mapping item in request process of current
broker %s",
+ topic, globalId,
this.brokerController.getBrokerConfig().getBrokerName()));
+ }
+
+ if (globalOffset < mappingItem.getStartOffset()) {
+ log.warn("{}-{} fetch offset {} smaller than the min mapping
offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset());
+ return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED,
String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in
broker %s",
+ topic, globalId, globalOffset,
mappingItem.getStartOffset(),
this.brokerController.getBrokerConfig().getBrokerName()));
+ }
+ //below are physical info
+ String bname = mappingItem.getBname();
+ Integer phyQueueId = mappingItem.getQueueId();
+ Long phyQueueOffset =
mappingItem.convertToPhysicalQueueOffset(globalOffset);
+ requestHeader.setQueueId(phyQueueId);
+ requestHeader.setQueueOffset(phyQueueOffset);
+ if (mappingItem.isEndOffsetDecided()
+ && requestHeader.getMaxMsgNums() != null) {
+ requestHeader.setMaxMsgNums((int)
Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(),
requestHeader.getMaxMsgNums()));
+ }
+
+ if
(this.brokerController.getBrokerConfig().getBrokerName().equals(bname)) {
+ //just let it go
+ return null;
+ }
+
+ requestHeader.setPhysical(true);
+ RemotingCommand response =
this.brokerController.getBrokerOuterAPI().pullMessage(bname, requestHeader,
this.brokerController.getBrokerConfig().getForwardTimeout());
+ switch (response.getCode()) {
+ case ResponseCode.SYSTEM_ERROR:
+ return response;
+ case ResponseCode.SUCCESS:
+ case ResponseCode.PULL_NOT_FOUND:
+ case ResponseCode.PULL_RETRY_IMMEDIATELY:
+ case ResponseCode.PULL_OFFSET_MOVED:
+ break;
+ default:
+ throw new MQBrokerException(response.getCode(),
response.getRemark(), mappingItem.getBname());
+ }
+ PullMessageResponseHeader responseHeader =
+ (PullMessageResponseHeader)
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+ {
+ RemotingCommand rewriteResult =
rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
+ if (rewriteResult != null) {
+ return rewriteResult;
+ }
+ }
+ return response;
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
+ }
+ }
+
+ private RemotingCommand
rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader,
PullMessageResponseHeader responseHeader, TopicQueueMappingContext
mappingContext) {
+ try {
+ if (mappingContext == null) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
+ LogicQueueMappingItem mappingItem =
mappingContext.getMappingItem();
+ //handle nextBeginOffset
+ {
+ long nextBeginOffset = responseHeader.getNextBeginOffset();
+ assert nextBeginOffset >= requestHeader.getQueueOffset();
+ //the next begin offset should no more than the end offset
+ if (mappingItem.isEndOffsetDecided()
+ && nextBeginOffset >= mappingItem.getEndOffset()) {
+ nextBeginOffset = mappingItem.getEndOffset();
+ }
+
responseHeader.setNextBeginOffset(mappingItem.convertToStaticQueueOffset(nextBeginOffset));
+ }
+ //handle min offset
+
responseHeader.setMinOffset(mappingItem.convertToStaticQueueOffset(Math.max(mappingItem.getStartOffset(),
responseHeader.getMinOffset())));
+ //handle max offset
+ {
+ if (mappingItem.isEndOffsetDecided()) {
+
responseHeader.setMaxOffset(Math.max(mappingItem.convertToMaxStaticQueueOffset(),
mappingDetail.getMaxOffsetFromMapping(mappingContext.getGlobalId())));
+ } else {
+
responseHeader.setMaxOffset(mappingItem.convertToStaticQueueOffset(responseHeader.getMaxOffset()));
+ }
+ }
+ //set the offsetDelta
+ responseHeader.setOffsetDelta(mappingItem.convertOffsetDelta());
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
+ }
+ return null;
+ }
+
+
private RemotingCommand processRequest(final Channel channel,
RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response =
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
@@ -147,6 +282,15 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
return response;
}
+ TopicQueueMappingContext mappingContext =
buildTopicQueueMappingContext(requestHeader);
+
+ {
+ RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ if (rewriteResult != null) {
+ return rewriteResult;
+ }
+ }
+
int queueId = requestHeader.getQueueId();
if (queueId < 0 || queueId >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal,
topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
@@ -395,6 +539,15 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
break;
}
+ //rewrite the response for the
+ {
+ RemotingCommand rewriteResult =
rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
+ if (rewriteResult != null) {
+ return rewriteResult;
+ }
+ }
+
+
if (this.hasConsumeMessageHook()) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setConsumerGroup(requestHeader.getConsumerGroup());
@@ -507,31 +660,7 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
- if
(this.brokerController.getMessageStoreConfig().getBrokerRole() !=
BrokerRole.SLAVE
- ||
this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
- MessageQueue mq = new MessageQueue();
- mq.setTopic(requestHeader.getTopic());
- mq.setQueueId(requestHeader.getQueueId());
-
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
-
- OffsetMovedEvent event = new OffsetMovedEvent();
-
event.setConsumerGroup(requestHeader.getConsumerGroup());
- event.setMessageQueue(mq);
- event.setOffsetRequest(requestHeader.getQueueOffset());
- event.setOffsetNew(nextBeginOffset);
- this.generateOffsetMovedEvent(event);
- log.warn(
- "PULL_OFFSET_MOVED:correction offset. topic={},
groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(),
requestHeader.getConsumerGroup(), event.getOffsetRequest(),
event.getOffsetNew(),
- responseHeader.getSuggestWhichBrokerId());
- } else {
-
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
- response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
- log.warn("PULL_OFFSET_MOVED:none correction. topic={},
groupId={}, requestOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
- responseHeader.getSuggestWhichBrokerId());
- }
-
+ handleOffsetMoved(requestHeader, responseHeader, response,
nextBeginOffset, subscriptionGroupConfig);
break;
default:
assert false;
@@ -552,6 +681,35 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
return response;
}
+ public void handleOffsetMoved(PullMessageRequestHeader requestHeader,
PullMessageResponseHeader responseHeader, RemotingCommand response,
+ long nextBeginOffset,
+ SubscriptionGroupConfig
subscriptionGroupConfig) {
+ if (this.brokerController.getMessageStoreConfig().getBrokerRole() !=
BrokerRole.SLAVE
+ ||
this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+ MessageQueue mq = new MessageQueue();
+ mq.setTopic(requestHeader.getTopic());
+ mq.setQueueId(requestHeader.getQueueId());
+
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+
+ OffsetMovedEvent event = new OffsetMovedEvent();
+ event.setConsumerGroup(requestHeader.getConsumerGroup());
+ event.setMessageQueue(mq);
+ event.setOffsetRequest(requestHeader.getQueueOffset());
+ event.setOffsetNew(nextBeginOffset);
+ this.generateOffsetMovedEvent(event);
+ log.warn(
+ "PULL_OFFSET_MOVED:correction offset. topic={},
groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(),
requestHeader.getConsumerGroup(), event.getOffsetRequest(),
event.getOffsetNew(),
+ responseHeader.getSuggestWhichBrokerId());
+ } else {
+
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={},
requestOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+ responseHeader.getSuggestWhichBrokerId());
+ }
+ }
+
private void prepareRedirectResponse(RemotingCommand response,
LogicalQueuesInfoInBroker logicalQueuesInfo,
LogicalQueueRouteData queueRouteData) {
LogicalQueueRouteData nextReadableLogicalQueueRouteData =
logicalQueuesInfo.nextAvailableLogicalRouteData(queueRouteData,
LogicalQueueRouteData::isReadable);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 7b88c75..62e2828 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.TopicQueueMappingContext;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
@@ -98,38 +99,53 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
- RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader);
+ TopicQueueMappingContext mappingContext =
buildTopicQueueMappingContext(requestHeader);
+ RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return CompletableFuture.completedFuture(rewriteResult);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request,
mqtraceContext);
if (requestHeader.isBatch()) {
- return this.asyncSendBatchMessage(ctx, request,
mqtraceContext, requestHeader);
+ return this.asyncSendBatchMessage(ctx, request,
mqtraceContext, requestHeader, mappingContext);
} else {
- return this.asyncSendMessage(ctx, request, mqtraceContext,
requestHeader);
+ return this.asyncSendMessage(ctx, request, mqtraceContext,
requestHeader, mappingContext);
}
}
}
+
+
private RemotingCommand buildErrorResponse(int code, String remark) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
response.setCode(code);
response.setRemark(remark);
return response;
}
+
+ private TopicQueueMappingContext
buildTopicQueueMappingContext(SendMessageRequestHeader requestHeader) {
+ if (requestHeader.getPhysical() != null
+ && Boolean.TRUE.equals(requestHeader.getPhysical())) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail =
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+ if (mappingDetail == null) {
+ //it is not static topic
+ return null;
+ }
+ return new TopicQueueMappingContext(requestHeader.getTopic(),
requestHeader.getQueueId(), null, mappingDetail, null);
+ }
/**
* If the response is not null, it meets some errors
* @param requestHeader
* @return
*/
- private RemotingCommand
rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader) {
+ private RemotingCommand
rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader,
TopicQueueMappingContext mappingContext) {
try {
- TopicQueueMappingDetail mappingDetail =
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
- if (mappingDetail == null) {
- //it is not static topic
+ if (mappingContext == null) {
return null;
}
+ TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
Integer phyQueueId = null;
//compatible with the old logic, but it fact, this should not
happen
if (requestHeader.getQueueId() < 0) {
@@ -151,22 +167,18 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
}
}
- private RemotingCommand rewriteResponseForStaticTopic(String topic,
SendMessageResponseHeader responseHeader) {
+ private RemotingCommand
rewriteResponseForStaticTopic(SendMessageRequestHeader requestHeader,
SendMessageResponseHeader responseHeader, TopicQueueMappingContext
mappingContext) {
try {
- TopicQueueMappingDetail mappingDetail =
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
- if (mappingDetail == null) {
+ if (mappingContext == null) {
return null;
}
- Integer globalId =
mappingDetail.getCurrIdMapRevert().get(responseHeader.getQueueId());
- if (globalId == null) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exist in response process of current broker %s",
topic, responseHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
- }
- long staticLogicOffset =
mappingDetail.convertToLogicOffset(globalId, responseHeader.getQueueOffset());
- if (staticLogicOffset < 0) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d convert offset error in current broker %s", topic,
responseHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
+ TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
+ long staticLogicOffset =
mappingDetail.convertToLogicOffset(mappingContext.getGlobalId(),
responseHeader.getQueueOffset());
+ if (staticLogicOffset < 0) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d convert offset error in current broker %s",
mappingContext.getTopic(), responseHeader.getQueueId(),
this.brokerController.getBrokerConfig().getBrokerName()));
}
- responseHeader.setQueueId(globalId);
+ responseHeader.setQueueId(mappingContext.getGlobalId());
responseHeader.setQueueOffset(staticLogicOffset);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
@@ -332,7 +344,8 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
private CompletableFuture<RemotingCommand>
asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
-
SendMessageRequestHeader requestHeader) {
+
SendMessageRequestHeader requestHeader,
+
TopicQueueMappingContext mappingContext) {
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader)response.readCustomHeader();
@@ -392,7 +405,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
} else {
putMessageResult =
this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
- return handlePutMessageResultFuture(putMessageResult, response,
request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
+ return handlePutMessageResultFuture(putMessageResult, response,
request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt,
requestHeader, mappingContext);
}
private CompletableFuture<RemotingCommand>
handlePutMessageResultFuture(CompletableFuture<PutMessageResult>
putMessageResult,
@@ -402,9 +415,11 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
SendMessageResponseHeader responseHeader,
SendMessageContext sendMessageContext,
ChannelHandlerContext ctx,
-
int queueIdInt) {
+
int queueIdInt,
+
SendMessageRequestHeader requestHeader,
+
TopicQueueMappingContext mappingContext) {
return putMessageResult.thenApply((r) ->
- handlePutMessageResult(r, response, request, msgInner,
responseHeader, sendMessageContext, ctx, queueIdInt)
+ handlePutMessageResult(r, response, request, msgInner,
responseHeader, sendMessageContext, ctx, queueIdInt, requestHeader,
mappingContext)
);
}
@@ -456,7 +471,8 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext
sendMessageContext,
- final SendMessageRequestHeader
requestHeader) throws RemotingCommandException {
+ final SendMessageRequestHeader
requestHeader,
+ final TopicQueueMappingContext
mappingContext) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader)response.readCustomHeader();
@@ -525,14 +541,15 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
putMessageResult =
this.brokerController.getMessageStore().putMessage(msgInner);
}
- return handlePutMessageResult(putMessageResult, response, request,
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
+ return handlePutMessageResult(putMessageResult, response, request,
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, requestHeader,
mappingContext);
}
private RemotingCommand handlePutMessageResult(PutMessageResult
putMessageResult, RemotingCommand response,
RemotingCommand request,
MessageExt msg,
SendMessageResponseHeader
responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext
ctx,
- int queueIdInt) {
+ int queueIdInt,
SendMessageRequestHeader requestHeader,
+ TopicQueueMappingContext
mappingContext) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
@@ -609,7 +626,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
- RemotingCommand rewriteResult =
rewriteResponseForStaticTopic(msg.getTopic(), responseHeader);
+ RemotingCommand rewriteResult =
rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
@@ -647,7 +664,8 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
private CompletableFuture<RemotingCommand>
asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
-
SendMessageRequestHeader requestHeader) {
+
SendMessageRequestHeader requestHeader,
+
TopicQueueMappingContext mappingContext) {
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader)response.readCustomHeader();
@@ -689,7 +707,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
MessageAccessor.putProperty(messageExtBatch,
MessageConst.PROPERTY_CLUSTER, clusterName);
CompletableFuture<PutMessageResult> putMessageResult =
this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
- return handlePutMessageResultFuture(putMessageResult, response,
request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
+ return handlePutMessageResultFuture(putMessageResult, response,
request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt,
requestHeader, mappingContext);
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index 78a27dc..0a9ee96 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -7,6 +7,7 @@ public class LogicQueueMappingItem {
private String bname;
private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset
+ private long endOffset; // the end of the physical offset
private long timeOfStart = -1; //mutable
public LogicQueueMappingItem(int gen, int queueId, String bname, long
logicOffset, long startOffset, long timeOfStart) {
@@ -18,8 +19,32 @@ public class LogicQueueMappingItem {
this.timeOfStart = timeOfStart;
}
- public long convertToStaticLogicOffset(long physicalLogicOffset) {
- return logicOffset + (physicalLogicOffset - startOffset);
+ public long convertToStaticQueueOffset(long physicalQueueOffset) {
+ return logicOffset + (physicalQueueOffset - startOffset);
+ }
+
+ public long convertToPhysicalQueueOffset(long staticQueueOffset) {
+ return (staticQueueOffset - logicOffset) + startOffset;
+ }
+
+ public long convertToMaxStaticQueueOffset() {
+ if (endOffset >= startOffset) {
+ return logicOffset + endOffset - startOffset;
+ } else {
+ return logicOffset;
+ }
+ }
+ public boolean isShouldDeleted() {
+ return endOffset == startOffset;
+ }
+
+ public boolean isEndOffsetDecided() {
+ //if the endOffset == startOffset, then the item should be deleted
+ return endOffset > startOffset;
+ }
+
+ public long convertOffsetDelta() {
+ return logicOffset - startOffset;
}
public int getGen() {
@@ -55,4 +80,13 @@ public class LogicQueueMappingItem {
public long getStartOffset() {
return startOffset;
}
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+
+ public void setEndOffset(long endOffset) {
+ this.endOffset = endOffset;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
new file mode 100644
index 0000000..50ac43e
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+public class TopicQueueMappingContext {
+ private String topic;
+ private Integer globalId;
+ private Long globalOffset;
+ private TopicQueueMappingDetail mappingDetail;
+ private LogicQueueMappingItem mappingItem;
+
+ public TopicQueueMappingContext(String topic, Integer globalId, Long
globalOffset, TopicQueueMappingDetail mappingDetail, LogicQueueMappingItem
mappingItem) {
+ this.topic = topic;
+ this.globalId = globalId;
+ this.globalOffset = globalOffset;
+ this.mappingDetail = mappingDetail;
+ this.mappingItem = mappingItem;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public Integer getGlobalId() {
+ return globalId;
+ }
+
+ public void setGlobalId(Integer globalId) {
+ this.globalId = globalId;
+ }
+
+ public Long getGlobalOffset() {
+ return globalOffset;
+ }
+
+ public void setGlobalOffset(Long globalOffset) {
+ this.globalOffset = globalOffset;
+ }
+
+ public TopicQueueMappingDetail getMappingDetail() {
+ return mappingDetail;
+ }
+
+ public void setMappingDetail(TopicQueueMappingDetail mappingDetail) {
+ this.mappingDetail = mappingDetail;
+ }
+
+ public LogicQueueMappingItem getMappingItem() {
+ return mappingItem;
+ }
+
+ public void setMappingItem(LogicQueueMappingItem mappingItem) {
+ this.mappingItem = mappingItem;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 0021310..8c2aad9 100644
---
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -27,9 +27,6 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver
ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>>
hostedQueues = new ConcurrentHashMap<Integer,
ImmutableList<LogicQueueMappingItem>>();
- transient ConcurrentMap<Integer/*physicalId*/, Integer/*logicId*/>
currIdMapRevert = new ConcurrentHashMap<Integer, Integer>();
-
-
public TopicQueueMappingDetail(String topic, int totalQueues, String
bname) {
super(topic, totalQueues, bname);
@@ -48,7 +45,6 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1);
- this.currIdMapRevert = revert(this.currIdMap);
}
public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer,
Integer> original) {
@@ -103,16 +99,48 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
return -1;
}
if (bname.equals(mappingItems.get(mappingItems.size() -
1).getBname())) {
- return mappingItems.get(mappingItems.size() -
1).convertToStaticLogicOffset(physicalLogicOffset);
+ return mappingItems.get(mappingItems.size() -
1).convertToStaticQueueOffset(physicalLogicOffset);
}
//Consider the "switch" process, reduce the error
if (mappingItems.size() >= 2
&& bname.equals(mappingItems.get(mappingItems.size() -
2).getBname())) {
- return mappingItems.get(mappingItems.size() -
2).convertToStaticLogicOffset(physicalLogicOffset);
+ return mappingItems.get(mappingItems.size() -
2).convertToStaticQueueOffset(physicalLogicOffset);
}
return -1;
}
+ public LogicQueueMappingItem getLogicQueueMappingItem(Integer globalId,
long logicOffset) {
+ List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+ if (mappingItems == null
+ || mappingItems.isEmpty()) {
+ return null;
+ }
+ //Could use bi-search to polish performance
+ for (int i = mappingItems.size() - 1; i >= 0; i--) {
+ LogicQueueMappingItem item = mappingItems.get(i);
+ if (logicOffset >= item.getLogicOffset()) {
+ return item;
+ }
+ }
+ //if not found, maybe out of range, return the first one
+ for (int i = 0; i < mappingItems.size(); i++) {
+ if (!mappingItems.get(i).isShouldDeleted()) {
+ return mappingItems.get(i);
+ }
+ }
+ return null;
+ }
+
+ public long getMaxOffsetFromMapping(Integer globalId) {
+ List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+ if (mappingItems == null
+ || mappingItems.isEmpty()) {
+ return -1;
+ }
+ LogicQueueMappingItem item = mappingItems.get(mappingItems.size() -
1);
+ return item.convertToMaxStaticQueueOffset();
+ }
+
public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new
TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
@@ -122,13 +150,6 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
return topicQueueMappingInfo;
}
- public ConcurrentMap<Integer, Integer> getCurrIdMapRevert() {
- return currIdMapRevert;
- }
-
- public void setCurrIdMapRevert(ConcurrentMap<Integer, Integer>
currIdMapRevert) {
- this.currIdMapRevert = currIdMapRevert;
- }
public int getTotalQueues() {
return totalQueues;
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 106e89e..1bce01f 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,12 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class PullMessageRequestHeader implements CommandCustomHeader {
+public class PullMessageRequestHeader extends RequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index 0112f7d..88af984 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -22,6 +22,7 @@ package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class PullMessageResponseHeader implements CommandCustomHeader {
@@ -33,6 +34,8 @@ public class PullMessageResponseHeader implements
CommandCustomHeader {
private Long minOffset;
@CFNotNull
private Long maxOffset;
+ @CFNullable
+ private Long offsetDelta;
@Override
public void checkFields() throws RemotingCommandException {
@@ -69,4 +72,12 @@ public class PullMessageResponseHeader implements
CommandCustomHeader {
public void setSuggestWhichBrokerId(Long suggestWhichBrokerId) {
this.suggestWhichBrokerId = suggestWhichBrokerId;
}
+
+ public Long getOffsetDelta() {
+ return offsetDelta;
+ }
+
+ public void setOffsetDelta(Long offsetDelta) {
+ this.offsetDelta = offsetDelta;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 2df31e6..f9dcbff 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,12 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class SendMessageRequestHeader implements CommandCustomHeader {
+public class SendMessageRequestHeader extends RequestHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java
new file mode 100644
index 0000000..109fb9e
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting;
+
+public abstract class RequestHeader implements CommandCustomHeader {
+ protected Boolean physical;
+
+ public Boolean getPhysical() {
+ return physical;
+ }
+
+ public void setPhysical(Boolean physical) {
+ this.physical = physical;
+ }
+}