This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new 5274294  Finish the rewrite logic for AdminBrokerProcessor
5274294 is described below

commit 52742942d89ee1b74ac4d0a0236bbeae85977baa
Author: dongeforever <[email protected]>
AuthorDate: Thu Nov 11 17:25:26 2021 +0800

    Finish the rewrite logic for AdminBrokerProcessor
---
 .../client/rebalance/RebalanceLockManager.java     |   9 +-
 .../broker/processor/AdminBrokerProcessor.java     | 123 ++++++++++++++++-----
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   1 -
 .../GetEarliestMsgStoretimeRequestHeader.java      |   8 +-
 .../protocol/header/GetMaxOffsetRequestHeader.java |  17 +--
 .../protocol/header/GetMinOffsetRequestHeader.java |   8 +-
 6 files changed, 121 insertions(+), 45 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
index 678b1f5..9056998 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
@@ -16,16 +16,17 @@
  */
 package org.apache.rocketmq.broker.client.rebalance;
 
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.message.MessageQueue;
 
 public class RebalanceLockManager {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 5f54889..34f9aad 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -696,6 +696,28 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         return response;
     }
 
+    private RemotingCommand 
rewriteRequestForStaticTopic(GetMaxOffsetRequestHeader requestHeader, 
TopicQueueMappingContext mappingContext) {
+        if (mappingContext.getMappingDetail() == null) {
+            return null;
+        }
+        TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
+        LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+        if (mappingItem == null
+                || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+            return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
+        }
+        long offset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(),
 mappingItem.getQueueId());
+
+        offset = mappingItem.computeStaticQueueOffset(offset);
+
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+        final GetMaxOffsetResponseHeader responseHeader = 
(GetMaxOffsetResponseHeader) response.readCustomHeader();
+        responseHeader.setOffset(offset);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
     private RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
@@ -703,33 +725,13 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         final GetMaxOffsetRequestHeader requestHeader =
             (GetMaxOffsetRequestHeader) 
request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
 
-        String topic = requestHeader.getTopic();
-        int queueId = requestHeader.getQueueId();
-
-        if (requestHeader.getLogicalQueue()) {
-            LogicalQueuesInfoInBroker logicalQueuesInfo = 
this.brokerController.getTopicConfigManager().selectLogicalQueuesInfo(topic);
-            if (logicalQueuesInfo != null) {
-                // max offset must be in the queue route with largest offset
-                LogicalQueueRouteData requestLogicalQueueRouteData = 
logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, Long.MAX_VALUE);
-                if (requestLogicalQueueRouteData != null) {
-                    logicalQueuesInfo.readLock().lock();
-                    try {
-                        List<LogicalQueueRouteData> queueRouteDataList = 
logicalQueuesInfo.get(requestLogicalQueueRouteData.getLogicalQueueIndex());
-                        if (queueRouteDataList != null && 
!queueRouteDataList.isEmpty()) {
-                            LogicalQueueRouteData 
selectedLogicalQueueRouteData = 
queueRouteDataList.get(queueRouteDataList.size() - 1);
-                            if 
(!Objects.equals(selectedLogicalQueueRouteData.getMessageQueue(), new 
MessageQueue(topic, this.brokerController.getBrokerConfig().getBrokerName(), 
queueId))) {
-                                log.info("getMaxOffset topic={} queueId={} not 
latest, redirect: {}", topic, queueId, selectedLogicalQueueRouteData);
-                                
response.addExtField(MessageConst.PROPERTY_REDIRECT, "1");
-                            }
-                        }
-                    } finally {
-                        logicalQueuesInfo.readLock().unlock();
-                    }
-                }
-            }
+        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+        RemotingCommand rewriteResult = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        if (rewriteResult != null) {
+            return rewriteResult;
         }
 
-        long offset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId, 
requestHeader.isCommitted());
+        long offset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
 requestHeader.getQueueId());
 
         responseHeader.setOffset(offset);
 
@@ -738,6 +740,36 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         return response;
     }
 
+    private RemotingCommand 
rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader, 
TopicQueueMappingContext mappingContext)  {
+        if (mappingContext.getMappingDetail() == null) {
+            return null;
+        }
+        TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
+        LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+        if (mappingItem == null
+                || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+            return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
+        };
+        try {
+            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, 
requestHeader, null);
+            RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getMinOffset(mappingItem.getBname(), 
rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+            if (rpcResponse.getException() != null) {
+                throw rpcResponse.getException();
+            }
+            GetMinOffsetResponseHeader offsetResponseHeader = 
(GetMinOffsetResponseHeader) rpcResponse.getHeader();
+            long offset = 
mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset());
+
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
+            final GetMinOffsetResponseHeader responseHeader = 
(GetMinOffsetResponseHeader) response.readCustomHeader();
+            responseHeader.setOffset(offset);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
+        }
+    }
+
     private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
@@ -745,6 +777,12 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         final GetMinOffsetRequestHeader requestHeader =
             (GetMinOffsetRequestHeader) 
request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
 
+        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 false, 0L);
+        RemotingCommand rewriteResult = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        if (rewriteResult != null) {
+            return rewriteResult;
+        }
+
         long offset = 
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
 requestHeader.getQueueId());
 
         responseHeader.setOffset(offset);
@@ -753,6 +791,35 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         return response;
     }
 
+    private RemotingCommand 
rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader 
requestHeader, TopicQueueMappingContext mappingContext)  {
+        if (mappingContext.getMappingDetail() == null) {
+            return null;
+        }
+        TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
+        LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+        if (mappingItem == null
+                || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+            return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
+        };
+        try {
+            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, 
requestHeader, null);
+            RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getEarliestMsgStoretime(mappingItem.getBname(),
 rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+            if (rpcResponse.getException() != null) {
+                throw rpcResponse.getException();
+            }
+            GetEarliestMsgStoretimeResponseHeader offsetResponseHeader = 
(GetEarliestMsgStoretimeResponseHeader) rpcResponse.getHeader();
+
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
+            final GetEarliestMsgStoretimeResponseHeader responseHeader = 
(GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
+            responseHeader.setTimestamp(offsetResponseHeader.getTimestamp());
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
+        }
+    }
+
     private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
@@ -760,6 +827,12 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         final GetEarliestMsgStoretimeRequestHeader requestHeader =
             (GetEarliestMsgStoretimeRequestHeader) 
request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
 
+        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 false, 0L);
+        RemotingCommand rewriteResult = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        if (rewriteResult != null) {
+            return rewriteResult;
+        }
+
         long timestamp =
             
this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(),
 requestHeader.getQueueId());
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 67fd937..2d9f17d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1126,7 +1126,6 @@ public class MQClientAPIImpl {
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
         requestHeader.setCommitted(committed);
-        requestHeader.setLogicalQueue(fromLogicalQueue);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
index c64381f..fea1736 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetEarliestMsgStoretimeRequestHeader implements 
CommandCustomHeader {
+public class GetEarliestMsgStoretimeRequestHeader extends 
TopicQueueRequestHeader {
     @CFNotNull
     private String topic;
     @CFNotNull
@@ -34,18 +34,22 @@ public class GetEarliestMsgStoretimeRequestHeader 
implements CommandCustomHeader
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
     public String getTopic() {
         return topic;
     }
 
+    @Override
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    @Override
     public Integer getQueueId() {
         return queueId;
     }
 
+    @Override
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index 6963195..e4226c2 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -20,34 +20,37 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
+public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader {
     @CFNotNull
     private String topic;
     @CFNotNull
     private Integer queueId;
     private boolean committed;
-    private boolean logicalQueue;
 
     @Override
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
     public String getTopic() {
         return topic;
     }
 
+    @Override
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    @Override
     public Integer getQueueId() {
         return queueId;
     }
 
+    @Override
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
@@ -59,12 +62,4 @@ public class GetMaxOffsetRequestHeader implements 
CommandCustomHeader {
     public boolean isCommitted() {
         return committed;
     }
-
-    public void setLogicalQueue(boolean logicalQueue) {
-        this.logicalQueue = logicalQueue;
-    }
-
-    public boolean getLogicalQueue() {
-        return logicalQueue;
-    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
index 6fb8ed4..6889ae8 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetMinOffsetRequestHeader implements CommandCustomHeader {
+public class GetMinOffsetRequestHeader extends TopicQueueRequestHeader {
     @CFNotNull
     private String topic;
     @CFNotNull
@@ -34,18 +34,22 @@ public class GetMinOffsetRequestHeader implements 
CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
     public String getTopic() {
         return topic;
     }
 
+    @Override
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    @Override
     public Integer getQueueId() {
         return queueId;
     }
 
+    @Override
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }

Reply via email to