This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 5274294 Finish the rewrite logic for AdminBrokerProcessor
5274294 is described below
commit 52742942d89ee1b74ac4d0a0236bbeae85977baa
Author: dongeforever <[email protected]>
AuthorDate: Thu Nov 11 17:25:26 2021 +0800
Finish the rewrite logic for AdminBrokerProcessor
---
.../client/rebalance/RebalanceLockManager.java | 9 +-
.../broker/processor/AdminBrokerProcessor.java | 123 ++++++++++++++++-----
.../rocketmq/client/impl/MQClientAPIImpl.java | 1 -
.../GetEarliestMsgStoretimeRequestHeader.java | 8 +-
.../protocol/header/GetMaxOffsetRequestHeader.java | 17 +--
.../protocol/header/GetMinOffsetRequestHeader.java | 8 +-
6 files changed, 121 insertions(+), 45 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
index 678b1f5..9056998 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
@@ -16,16 +16,17 @@
*/
package org.apache.rocketmq.broker.client.rebalance;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.message.MessageQueue;
public class RebalanceLockManager {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 5f54889..34f9aad 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -696,6 +696,28 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
return response;
}
+ private RemotingCommand
rewriteRequestForStaticTopic(GetMaxOffsetRequestHeader requestHeader,
TopicQueueMappingContext mappingContext) {
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
+ LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+ if (mappingItem == null
+ || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
+ }
+ long offset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(),
mappingItem.getQueueId());
+
+ offset = mappingItem.computeStaticQueueOffset(offset);
+
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+ final GetMaxOffsetResponseHeader responseHeader =
(GetMaxOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
@@ -703,33 +725,13 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
final GetMaxOffsetRequestHeader requestHeader =
(GetMaxOffsetRequestHeader)
request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
- String topic = requestHeader.getTopic();
- int queueId = requestHeader.getQueueId();
-
- if (requestHeader.getLogicalQueue()) {
- LogicalQueuesInfoInBroker logicalQueuesInfo =
this.brokerController.getTopicConfigManager().selectLogicalQueuesInfo(topic);
- if (logicalQueuesInfo != null) {
- // max offset must be in the queue route with largest offset
- LogicalQueueRouteData requestLogicalQueueRouteData =
logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, Long.MAX_VALUE);
- if (requestLogicalQueueRouteData != null) {
- logicalQueuesInfo.readLock().lock();
- try {
- List<LogicalQueueRouteData> queueRouteDataList =
logicalQueuesInfo.get(requestLogicalQueueRouteData.getLogicalQueueIndex());
- if (queueRouteDataList != null &&
!queueRouteDataList.isEmpty()) {
- LogicalQueueRouteData
selectedLogicalQueueRouteData =
queueRouteDataList.get(queueRouteDataList.size() - 1);
- if
(!Objects.equals(selectedLogicalQueueRouteData.getMessageQueue(), new
MessageQueue(topic, this.brokerController.getBrokerConfig().getBrokerName(),
queueId))) {
- log.info("getMaxOffset topic={} queueId={} not
latest, redirect: {}", topic, queueId, selectedLogicalQueueRouteData);
-
response.addExtField(MessageConst.PROPERTY_REDIRECT, "1");
- }
- }
- } finally {
- logicalQueuesInfo.readLock().unlock();
- }
- }
- }
+ TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+ RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ if (rewriteResult != null) {
+ return rewriteResult;
}
- long offset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId,
requestHeader.isCommitted());
+ long offset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
responseHeader.setOffset(offset);
@@ -738,6 +740,36 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
return response;
}
+ private RemotingCommand
rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader,
TopicQueueMappingContext mappingContext) {
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
+ LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+ if (mappingItem == null
+ || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
+ };
+ try {
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET,
requestHeader, null);
+ RpcResponse rpcResponse =
this.brokerController.getBrokerOuterAPI().getMinOffset(mappingItem.getBname(),
rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ GetMinOffsetResponseHeader offsetResponseHeader =
(GetMinOffsetResponseHeader) rpcResponse.getHeader();
+ long offset =
mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset());
+
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
+ final GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
+ }
+ }
+
private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
@@ -745,6 +777,12 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader)
request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
+ TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
false, 0L);
+ RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ if (rewriteResult != null) {
+ return rewriteResult;
+ }
+
long offset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
responseHeader.setOffset(offset);
@@ -753,6 +791,35 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
return response;
}
+ private RemotingCommand
rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader
requestHeader, TopicQueueMappingContext mappingContext) {
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
+ LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+ if (mappingItem == null
+ || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
+ };
+ try {
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET,
requestHeader, null);
+ RpcResponse rpcResponse =
this.brokerController.getBrokerOuterAPI().getEarliestMsgStoretime(mappingItem.getBname(),
rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ GetEarliestMsgStoretimeResponseHeader offsetResponseHeader =
(GetEarliestMsgStoretimeResponseHeader) rpcResponse.getHeader();
+
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
+ final GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
+ responseHeader.setTimestamp(offsetResponseHeader.getTimestamp());
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
+ }
+ }
+
private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
@@ -760,6 +827,12 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
final GetEarliestMsgStoretimeRequestHeader requestHeader =
(GetEarliestMsgStoretimeRequestHeader)
request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
+ TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
false, 0L);
+ RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ if (rewriteResult != null) {
+ return rewriteResult;
+ }
+
long timestamp =
this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(),
requestHeader.getQueueId());
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 67fd937..2d9f17d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1126,7 +1126,6 @@ public class MQClientAPIImpl {
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setCommitted(committed);
- requestHeader.setLogicalQueue(fromLogicalQueue);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr),
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
index c64381f..fea1736 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetEarliestMsgStoretimeRequestHeader implements
CommandCustomHeader {
+public class GetEarliestMsgStoretimeRequestHeader extends
TopicQueueRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
@@ -34,18 +34,22 @@ public class GetEarliestMsgStoretimeRequestHeader
implements CommandCustomHeader
public void checkFields() throws RemotingCommandException {
}
+ @Override
public String getTopic() {
return topic;
}
+ @Override
public void setTopic(String topic) {
this.topic = topic;
}
+ @Override
public Integer getQueueId() {
return queueId;
}
+ @Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index 6963195..e4226c2 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -20,34 +20,37 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
+public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
private Integer queueId;
private boolean committed;
- private boolean logicalQueue;
@Override
public void checkFields() throws RemotingCommandException {
}
+ @Override
public String getTopic() {
return topic;
}
+ @Override
public void setTopic(String topic) {
this.topic = topic;
}
+ @Override
public Integer getQueueId() {
return queueId;
}
+ @Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
@@ -59,12 +62,4 @@ public class GetMaxOffsetRequestHeader implements
CommandCustomHeader {
public boolean isCommitted() {
return committed;
}
-
- public void setLogicalQueue(boolean logicalQueue) {
- this.logicalQueue = logicalQueue;
- }
-
- public boolean getLogicalQueue() {
- return logicalQueue;
- }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
index 6fb8ed4..6889ae8 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetMinOffsetRequestHeader implements CommandCustomHeader {
+public class GetMinOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
@@ -34,18 +34,22 @@ public class GetMinOffsetRequestHeader implements
CommandCustomHeader {
public void checkFields() throws RemotingCommandException {
}
+ @Override
public String getTopic() {
return topic;
}
+ @Override
public void setTopic(String topic) {
this.topic = topic;
}
+ @Override
public Integer getQueueId() {
return queueId;
}
+ @Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}