This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new 71af9c7  Finish the logic for SendProcessor
71af9c7 is described below

commit 71af9c75cfdfc3a0a026b6524c45daea4b68cd65
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 10 14:23:12 2021 +0800

    Finish the logic for SendProcessor
---
 .../broker/processor/SendMessageProcessor.java     | 75 +++++++++++++++++-----
 .../client/impl/producer/TopicPublishInfo.java     |  5 --
 .../rocketmq/common/LogicQueueMappingItem.java     |  4 ++
 .../rocketmq/common/TopicQueueMappingDetail.java   | 18 ++++++
 .../rocketmq/common/protocol/ResponseCode.java     |  2 +
 5 files changed, 82 insertions(+), 22 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 91463a4..9daebf4 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.help.FAQUrl;
@@ -96,6 +97,10 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 if (requestHeader == null) {
                     return CompletableFuture.completedFuture(null);
                 }
+                RemotingCommand rewriteResult =  
rewriteRequestForStaticTopic(requestHeader);
+                if (rewriteResult != null) {
+                    return CompletableFuture.completedFuture(rewriteResult);
+                }
                 mqtraceContext = buildMsgContext(ctx, requestHeader);
                 this.executeSendMessageHookBefore(ctx, request, 
mqtraceContext);
                 if (requestHeader.isBatch()) {
@@ -106,6 +111,54 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         }
     }
 
+    private RemotingCommand buildErrorResponse(int code, String remark) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        response.setCode(code);
+        response.setRemark(remark);
+        return response;
+    }
+    /**
+     * If the response is not null, it meets some errors
+     * @param requestHeader
+     * @return
+     */
+    private RemotingCommand 
rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader) {
+        try {
+            TopicQueueMappingDetail mappingDetail = 
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+            if (mappingDetail == null) {
+                return null;
+            }
+            if 
(!mappingDetail.getCurrIdMap().containsKey(requestHeader.getQueueId())) {
+               return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in current broker %s", 
requestHeader.getTopic(), requestHeader.getQueueId(), 
this.brokerController.getBrokerConfig().getBrokerName()));
+            }
+            
requestHeader.setQueueId(mappingDetail.getCurrIdMap().get(requestHeader.getQueueId()));
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
+        }
+        return null;
+    }
+
+    private RemotingCommand rewriteResponseForStaticTopic(String topic, 
SendMessageResponseHeader responseHeader) {
+        try {
+            TopicQueueMappingDetail mappingDetail = 
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
+            if (mappingDetail == null) {
+                return null;
+            }
+            if 
(!mappingDetail.getCurrIdMap().containsKey(responseHeader.getQueueId())) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in current broker %s", topic, 
responseHeader.getQueueId(), 
this.brokerController.getBrokerConfig().getBrokerName()));
+            }
+            long staticLogicOffset = 
mappingDetail.convertToLogicOffset(responseHeader.getQueueId(), 
responseHeader.getQueueOffset());
+            if (staticLogicOffset < 0) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d convert offset error in current broker %s", topic, 
responseHeader.getQueueId(), 
this.brokerController.getBrokerConfig().getBrokerName()));
+
+            }
+            responseHeader.setQueueOffset(staticLogicOffset);
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
+        }
+        return null;
+    }
+
     @Override
     public boolean rejectRequest() {
         return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
@@ -310,12 +363,6 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         }
 
-        LogicalQueueContext logicalQueueContext = 
super.buildLogicalQueueContext(msgInner.getTopic(), msgInner.getQueueId(), 
response);
-        CompletableFuture<RemotingCommand> future = 
logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
-        if (future != null) {
-            return future;
-        }
-
         CompletableFuture<PutMessageResult> putMessageResult = null;
         String transFlag = 
origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
         if (transFlag != null && Boolean.parseBoolean(transFlag)) {
@@ -324,14 +371,12 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 response.setRemark(
                         "the broker[" + 
this.brokerController.getBrokerConfig().getBrokerIP1()
                                 + "] sending transaction message is 
forbidden");
-                logicalQueueContext.hookAfterPut(null);
                 return CompletableFuture.completedFuture(response);
             }
             putMessageResult = 
this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
         } else {
             putMessageResult = 
this.brokerController.getMessageStore().asyncPutMessage(msgInner);
         }
-        logicalQueueContext.hookAfterPut(putMessageResult);
         return handlePutMessageResultFuture(putMessageResult, response, 
request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
     }
 
@@ -549,6 +594,11 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             responseHeader.setQueueId(queueIdInt);
             
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
 
+            RemotingCommand rewriteResult =  
rewriteResponseForStaticTopic(msg.getTopic(), responseHeader);
+            if (rewriteResult != null) {
+                return rewriteResult;
+            }
+
             doResponse(ctx, request, response);
 
             if (hasSendMessageHook()) {
@@ -623,16 +673,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         String clusterName = 
this.brokerController.getBrokerConfig().getBrokerClusterName();
         MessageAccessor.putProperty(messageExtBatch, 
MessageConst.PROPERTY_CLUSTER, clusterName);
 
-        LogicalQueueContext logicalQueueContext = 
super.buildLogicalQueueContext(messageExtBatch.getTopic(), 
messageExtBatch.getQueueId(), response);
-        CompletableFuture<RemotingCommand> future = 
logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
-        if (future != null) {
-            return future;
-        }
-
         CompletableFuture<PutMessageResult> putMessageResult = 
this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
-
-        logicalQueueContext.hookAfterPut(putMessageResult);
-
         return handlePutMessageResultFuture(putMessageResult, response, 
request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 60974cc..2f8337e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -17,13 +17,9 @@
 package org.apache.rocketmq.client.impl.producer;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 
@@ -120,5 +116,4 @@ public class TopicPublishInfo {
     public void setTopicRouteData(final TopicRouteData topicRouteData) {
         this.topicRouteData = topicRouteData;
     }
-
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java 
b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index 50d88ae..78a27dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -18,6 +18,10 @@ public class LogicQueueMappingItem {
         this.timeOfStart = timeOfStart;
     }
 
+    public long convertToStaticLogicOffset(long physicalLogicOffset) {
+        return  logicOffset + (physicalLogicOffset - startOffset);
+    }
+
     public int getGen() {
         return gen;
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index d3e3d92..a181130 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -81,6 +81,24 @@ public class TopicQueueMappingDetail extends 
TopicQueueMappingInfo {
     }
 
 
+    public long convertToLogicOffset(Integer globalId, long 
physicalLogicOffset) {
+        List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+        if (mappingItems == null
+                || mappingItems.isEmpty()) {
+            return -1;
+        }
+        if (bname.equals(mappingItems.get(mappingItems.size() - 
1).getBname())) {
+            return mappingItems.get(mappingItems.size() - 
1).convertToStaticLogicOffset(physicalLogicOffset);
+        }
+        //Consider the "switch" process, reduce the error
+        if (mappingItems.size() >= 2
+            && bname.equals(mappingItems.get(mappingItems.size() - 
2).getBname())) {
+            return mappingItems.get(mappingItems.size() - 
2).convertToStaticLogicOffset(physicalLogicOffset);
+        }
+        return -1;
+    }
+
+
     public TopicQueueMappingInfo cloneAsMappingInfo() {
         TopicQueueMappingInfo topicQueueMappingInfo = new 
TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
         topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index 42b9c4f..3944c18 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -84,4 +84,6 @@ public class ResponseCode extends RemotingSysResponseCode {
     public static final int POLLING_FULL = 209;
 
     public static final int POLLING_TIMEOUT = 210;
+
+    public static final int NOT_LEADER_FOR_QUEUE = 501;
 }

Reply via email to