This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new 5e04a27  Finish the rewrite topic for pull and send process
5e04a27 is described below

commit 5e04a2782f3621ca4ff97711586f5826068db71f
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 10 20:13:17 2021 +0800

    Finish the rewrite topic for pull and send process
---
 .../apache/rocketmq/broker/BrokerController.java   |   2 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  30 ++-
 .../broker/processor/PullMessageProcessor.java     | 208 ++++++++++++++++++---
 .../broker/processor/SendMessageProcessor.java     |  74 +++++---
 .../rocketmq/common/LogicQueueMappingItem.java     |  38 +++-
 .../rocketmq/common/TopicQueueMappingContext.java  |  73 ++++++++
 .../rocketmq/common/TopicQueueMappingDetail.java   |  47 +++--
 .../protocol/header/PullMessageRequestHeader.java  |   4 +-
 .../protocol/header/PullMessageResponseHeader.java |  11 ++
 .../protocol/header/SendMessageRequestHeader.java  |   4 +-
 .../apache/rocketmq/remoting/RequestHeader.java    |  29 +++
 11 files changed, 444 insertions(+), 76 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index e08dbbe..03fd661 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -236,7 +236,7 @@ public class BrokerController {
         this.clientHousekeepingService = new ClientHousekeepingService(this);
         this.broker2Client = new Broker2Client(this);
         this.subscriptionGroupManager = new SubscriptionGroupManager(this);
-        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
+        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig, this);
         this.filterServerManager = new FilterServerManager(this);
 
         this.assignmentManager = new AssignmentManager(this);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index a775864..7f3ce81 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -23,8 +23,12 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -41,6 +45,8 @@ import 
org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import 
org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
@@ -66,18 +72,20 @@ import 
org.apache.rocketmq.remoting.protocol.RemotingCommand;
 public class BrokerOuterAPI {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final RemotingClient remotingClient;
+    private final BrokerController brokerController;
     private final TopAddressing topAddressing = new 
TopAddressing(MixAll.getWSAddr());
     private String nameSrvAddr = null;
     private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new 
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
         new ArrayBlockingQueue<Runnable>(32), new 
ThreadFactoryImpl("brokerOutApi_thread_", true));
 
-    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
-        this(nettyClientConfig, null);
+    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, final 
BrokerController brokerController) {
+        this(nettyClientConfig, null, brokerController);
     }
 
-    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook 
rpcHook) {
+    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook 
rpcHook, final BrokerController brokerController) {
         this.remotingClient = new NettyRemotingClient(nettyClientConfig);
         this.remotingClient.registerRPCHook(rpcHook);
+        this.brokerController = brokerController;
     }
 
     public void start() {
@@ -454,4 +462,20 @@ public class BrokerOuterAPI {
     public void forwardRequest(String brokerAddr, RemotingCommand request, 
long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, 
RemotingSendRequestException, RemotingTimeoutException, 
RemotingTooMuchRequestException, RemotingConnectException {
         this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, 
invokeCallback);
     }
+
+    public RemotingCommand pullMessage(String brokerName, 
PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception {
+
+        String addr = this.brokerController.getBrokerAddrByName(brokerName);
+        if (addr == null) {
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("%s-%d cannot find addr when 
forward to broker %s in broker %s", requestHeader.getTopic(), 
requestHeader.getQueueId(), brokerName, 
this.brokerController.getBrokerConfig().getBrokerName()));
+            return response;
+        }
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
+        assert response != null;
+        return response;
+
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index e61ef11..5ab3c01 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.FileRegion;
 import java.nio.ByteBuffer;
 import java.util.List;
+
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
@@ -35,9 +36,15 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.common.LogicQueueMappingItem;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.TopicQueueMappingContext;
+import org.apache.rocketmq.common.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
@@ -49,6 +56,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
@@ -95,6 +104,132 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
         return false;
     }
 
+
+    private RemotingCommand buildErrorResponse(int code, String remark) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+        response.setCode(code);
+        response.setRemark(remark);
+        return response;
+    }
+
+    private TopicQueueMappingContext 
buildTopicQueueMappingContext(PullMessageRequestHeader requestHeader) {
+        if (requestHeader.getPhysical() != null
+                && Boolean.TRUE.equals(requestHeader.getPhysical())) {
+            return null;
+        }
+        TopicQueueMappingDetail mappingDetail = 
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+        if (mappingDetail == null) {
+            //it is not static topic
+            return null;
+        }
+        String topic = requestHeader.getTopic();
+        Integer globalId = requestHeader.getQueueId();
+        Long globalOffset = requestHeader.getQueueOffset();
+
+        LogicQueueMappingItem mappingItem = 
mappingDetail.getLogicQueueMappingItem(globalId, globalOffset);
+        return new TopicQueueMappingContext(topic, globalId, globalOffset, 
mappingDetail, mappingItem);
+    }
+
+    private RemotingCommand 
rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, 
TopicQueueMappingContext mappingContext) {
+        try {
+            if (mappingContext == null) {
+                return null;
+            }
+            String topic = mappingContext.getTopic();
+            Integer globalId = mappingContext.getGlobalId();
+            Long globalOffset = mappingContext.getGlobalOffset();
+
+            LogicQueueMappingItem mappingItem = 
mappingContext.getMappingItem();
+            if (mappingItem == null) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d cannot find mapping item in request process of current 
broker %s",
+                        topic, globalId, 
this.brokerController.getBrokerConfig().getBrokerName()));
+            }
+
+            if (globalOffset < mappingItem.getStartOffset()) {
+                log.warn("{}-{} fetch offset {} smaller than the min mapping 
offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset());
+                return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, 
String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in  
broker %s",
+                        topic, globalId, globalOffset, 
mappingItem.getStartOffset(), 
this.brokerController.getBrokerConfig().getBrokerName()));
+            }
+            //below are physical info
+            String bname = mappingItem.getBname();
+            Integer phyQueueId = mappingItem.getQueueId();
+            Long phyQueueOffset = 
mappingItem.convertToPhysicalQueueOffset(globalOffset);
+            requestHeader.setQueueId(phyQueueId);
+            requestHeader.setQueueOffset(phyQueueOffset);
+            if (mappingItem.isEndOffsetDecided()
+                    && requestHeader.getMaxMsgNums() != null) {
+                requestHeader.setMaxMsgNums((int) 
Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), 
requestHeader.getMaxMsgNums()));
+            }
+
+            if 
(this.brokerController.getBrokerConfig().getBrokerName().equals(bname)) {
+                //just let it go
+                return null;
+            }
+
+            requestHeader.setPhysical(true);
+            RemotingCommand response = 
this.brokerController.getBrokerOuterAPI().pullMessage(bname, requestHeader, 
this.brokerController.getBrokerConfig().getForwardTimeout());
+            switch (response.getCode()) {
+                case ResponseCode.SYSTEM_ERROR:
+                    return response;
+                case ResponseCode.SUCCESS:
+                case ResponseCode.PULL_NOT_FOUND:
+               case ResponseCode.PULL_RETRY_IMMEDIATELY:
+                case ResponseCode.PULL_OFFSET_MOVED:
+                    break;
+                default:
+                    throw new MQBrokerException(response.getCode(), 
response.getRemark(), mappingItem.getBname());
+            }
+            PullMessageResponseHeader responseHeader =
+                    (PullMessageResponseHeader) 
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+            {
+                RemotingCommand rewriteResult =  
rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
+                if (rewriteResult != null) {
+                    return rewriteResult;
+                }
+            }
+            return response;
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
+        }
+    }
+
+    private RemotingCommand 
rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, 
PullMessageResponseHeader responseHeader, TopicQueueMappingContext 
mappingContext) {
+        try {
+            if (mappingContext == null) {
+                return null;
+            }
+            TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
+            LogicQueueMappingItem mappingItem = 
mappingContext.getMappingItem();
+            //handle nextBeginOffset
+            {
+                long nextBeginOffset = responseHeader.getNextBeginOffset();
+                assert nextBeginOffset >= requestHeader.getQueueOffset();
+                //the next begin offset should no more than the end offset
+                if (mappingItem.isEndOffsetDecided()
+                        && nextBeginOffset >= mappingItem.getEndOffset()) {
+                    nextBeginOffset = mappingItem.getEndOffset();
+                }
+                
responseHeader.setNextBeginOffset(mappingItem.convertToStaticQueueOffset(nextBeginOffset));
+            }
+            //handle min offset
+            
responseHeader.setMinOffset(mappingItem.convertToStaticQueueOffset(Math.max(mappingItem.getStartOffset(),
 responseHeader.getMinOffset())));
+            //handle max offset
+            {
+                if (mappingItem.isEndOffsetDecided()) {
+                    
responseHeader.setMaxOffset(Math.max(mappingItem.convertToMaxStaticQueueOffset(),
 mappingDetail.getMaxOffsetFromMapping(mappingContext.getGlobalId())));
+                } else {
+                    
responseHeader.setMaxOffset(mappingItem.convertToStaticQueueOffset(responseHeader.getMaxOffset()));
+                }
+            }
+            //set the offsetDelta
+            responseHeader.setOffsetDelta(mappingItem.convertOffsetDelta());
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
+        }
+        return null;
+    }
+
+
     private RemotingCommand processRequest(final Channel channel, 
RemotingCommand request, boolean brokerAllowSuspend)
         throws RemotingCommandException {
         RemotingCommand response = 
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
@@ -147,6 +282,15 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
             return response;
         }
 
+        TopicQueueMappingContext mappingContext = 
buildTopicQueueMappingContext(requestHeader);
+
+        {
+            RemotingCommand rewriteResult = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+            if (rewriteResult != null) {
+                return rewriteResult;
+            }
+        }
+
         int queueId = requestHeader.getQueueId();
         if (queueId < 0 || queueId >= topicConfig.getReadQueueNums()) {
             String errorInfo = String.format("queueId[%d] is illegal, 
topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
@@ -395,6 +539,15 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
                     break;
             }
 
+            //rewrite the response for the
+            {
+                RemotingCommand rewriteResult =  
rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
+                if (rewriteResult != null) {
+                    return rewriteResult;
+                }
+            }
+
+
             if (this.hasConsumeMessageHook()) {
                 ConsumeMessageContext context = new ConsumeMessageContext();
                 context.setConsumerGroup(requestHeader.getConsumerGroup());
@@ -507,31 +660,7 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
                 case ResponseCode.PULL_RETRY_IMMEDIATELY:
                     break;
                 case ResponseCode.PULL_OFFSET_MOVED:
-                    if 
(this.brokerController.getMessageStoreConfig().getBrokerRole() != 
BrokerRole.SLAVE
-                        || 
this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
-                        MessageQueue mq = new MessageQueue();
-                        mq.setTopic(requestHeader.getTopic());
-                        mq.setQueueId(requestHeader.getQueueId());
-                        
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
-
-                        OffsetMovedEvent event = new OffsetMovedEvent();
-                        
event.setConsumerGroup(requestHeader.getConsumerGroup());
-                        event.setMessageQueue(mq);
-                        event.setOffsetRequest(requestHeader.getQueueOffset());
-                        event.setOffsetNew(nextBeginOffset);
-                        this.generateOffsetMovedEvent(event);
-                        log.warn(
-                            "PULL_OFFSET_MOVED:correction offset. topic={}, 
groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
-                            requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), event.getOffsetRequest(), 
event.getOffsetNew(),
-                            responseHeader.getSuggestWhichBrokerId());
-                    } else {
-                        
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
-                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
-                        log.warn("PULL_OFFSET_MOVED:none correction. topic={}, 
groupId={}, requestOffset={}, suggestBrokerId={}",
-                            requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
-                            responseHeader.getSuggestWhichBrokerId());
-                    }
-
+                    handleOffsetMoved(requestHeader, responseHeader, response, 
nextBeginOffset, subscriptionGroupConfig);
                     break;
                 default:
                     assert false;
@@ -552,6 +681,35 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
         return response;
     }
 
+    public void handleOffsetMoved(PullMessageRequestHeader requestHeader, 
PullMessageResponseHeader responseHeader, RemotingCommand response,
+                                  long nextBeginOffset,
+                                  SubscriptionGroupConfig 
subscriptionGroupConfig) {
+        if (this.brokerController.getMessageStoreConfig().getBrokerRole() != 
BrokerRole.SLAVE
+                || 
this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+            MessageQueue mq = new MessageQueue();
+            mq.setTopic(requestHeader.getTopic());
+            mq.setQueueId(requestHeader.getQueueId());
+            
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+
+            OffsetMovedEvent event = new OffsetMovedEvent();
+            event.setConsumerGroup(requestHeader.getConsumerGroup());
+            event.setMessageQueue(mq);
+            event.setOffsetRequest(requestHeader.getQueueOffset());
+            event.setOffsetNew(nextBeginOffset);
+            this.generateOffsetMovedEvent(event);
+            log.warn(
+                    "PULL_OFFSET_MOVED:correction offset. topic={}, 
groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+                    requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), event.getOffsetRequest(), 
event.getOffsetNew(),
+                    responseHeader.getSuggestWhichBrokerId());
+        } else {
+            
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+            log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, 
requestOffset={}, suggestBrokerId={}",
+                    requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+                    responseHeader.getSuggestWhichBrokerId());
+        }
+    }
+
     private void prepareRedirectResponse(RemotingCommand response, 
LogicalQueuesInfoInBroker logicalQueuesInfo,
         LogicalQueueRouteData queueRouteData) {
         LogicalQueueRouteData nextReadableLogicalQueueRouteData = 
logicalQueuesInfo.nextAvailableLogicalRouteData(queueRouteData, 
LogicalQueueRouteData::isReadable);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 7b88c75..62e2828 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.TopicQueueMappingContext;
 import org.apache.rocketmq.common.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
@@ -98,38 +99,53 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 if (requestHeader == null) {
                     return CompletableFuture.completedFuture(null);
                 }
-                RemotingCommand rewriteResult =  
rewriteRequestForStaticTopic(requestHeader);
+                TopicQueueMappingContext mappingContext = 
buildTopicQueueMappingContext(requestHeader);
+                RemotingCommand rewriteResult =  
rewriteRequestForStaticTopic(requestHeader, mappingContext);
                 if (rewriteResult != null) {
                     return CompletableFuture.completedFuture(rewriteResult);
                 }
                 mqtraceContext = buildMsgContext(ctx, requestHeader);
                 this.executeSendMessageHookBefore(ctx, request, 
mqtraceContext);
                 if (requestHeader.isBatch()) {
-                    return this.asyncSendBatchMessage(ctx, request, 
mqtraceContext, requestHeader);
+                    return this.asyncSendBatchMessage(ctx, request, 
mqtraceContext, requestHeader, mappingContext);
                 } else {
-                    return this.asyncSendMessage(ctx, request, mqtraceContext, 
requestHeader);
+                    return this.asyncSendMessage(ctx, request, mqtraceContext, 
requestHeader, mappingContext);
                 }
         }
     }
 
+
+
     private RemotingCommand buildErrorResponse(int code, String remark) {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         response.setCode(code);
         response.setRemark(remark);
         return response;
     }
+
+    private TopicQueueMappingContext 
buildTopicQueueMappingContext(SendMessageRequestHeader requestHeader) {
+        if (requestHeader.getPhysical() != null
+                && Boolean.TRUE.equals(requestHeader.getPhysical())) {
+            return null;
+        }
+        TopicQueueMappingDetail mappingDetail = 
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
+        if (mappingDetail == null) {
+            //it is not static topic
+            return null;
+        }
+        return new TopicQueueMappingContext(requestHeader.getTopic(), 
requestHeader.getQueueId(), null, mappingDetail, null);
+    }
     /**
      * If the response is not null, it meets some errors
      * @param requestHeader
      * @return
      */
-    private RemotingCommand 
rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader) {
+    private RemotingCommand 
rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader, 
TopicQueueMappingContext mappingContext) {
         try {
-            TopicQueueMappingDetail mappingDetail = 
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
-            if (mappingDetail == null) {
-                //it is not static topic
+            if (mappingContext == null) {
                 return null;
             }
+            TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
             Integer phyQueueId = null;
             //compatible with the old logic, but it fact, this should not 
happen
             if (requestHeader.getQueueId() < 0) {
@@ -151,22 +167,18 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         }
     }
 
-    private RemotingCommand rewriteResponseForStaticTopic(String topic, 
SendMessageResponseHeader responseHeader) {
+    private RemotingCommand 
rewriteResponseForStaticTopic(SendMessageRequestHeader requestHeader, 
SendMessageResponseHeader responseHeader,  TopicQueueMappingContext 
mappingContext) {
         try {
-            TopicQueueMappingDetail mappingDetail = 
this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
-            if (mappingDetail == null) {
+            if (mappingContext == null) {
                 return null;
             }
-            Integer globalId = 
mappingDetail.getCurrIdMapRevert().get(responseHeader.getQueueId());
-            if (globalId == null) {
-                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exist in response process of current broker %s", 
topic, responseHeader.getQueueId(), 
this.brokerController.getBrokerConfig().getBrokerName()));
-            }
-            long staticLogicOffset = 
mappingDetail.convertToLogicOffset(globalId, responseHeader.getQueueOffset());
-            if (staticLogicOffset < 0) {
-                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d convert offset error in current broker %s", topic, 
responseHeader.getQueueId(), 
this.brokerController.getBrokerConfig().getBrokerName()));
+            TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
 
+            long staticLogicOffset = 
mappingDetail.convertToLogicOffset(mappingContext.getGlobalId(), 
responseHeader.getQueueOffset());
+            if (staticLogicOffset < 0) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d convert offset error in current broker %s", 
mappingContext.getTopic(), responseHeader.getQueueId(), 
this.brokerController.getBrokerConfig().getBrokerName()));
             }
-            responseHeader.setQueueId(globalId);
+            responseHeader.setQueueId(mappingContext.getGlobalId());
             responseHeader.setQueueOffset(staticLogicOffset);
         } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
@@ -332,7 +344,8 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
 
     private CompletableFuture<RemotingCommand> 
asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                 
SendMessageContext mqtraceContext,
-                                                                
SendMessageRequestHeader requestHeader) {
+                                                                
SendMessageRequestHeader requestHeader,
+                                                                
TopicQueueMappingContext mappingContext) {
         final RemotingCommand response = preSend(ctx, request, requestHeader);
         final SendMessageResponseHeader responseHeader = 
(SendMessageResponseHeader)response.readCustomHeader();
 
@@ -392,7 +405,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         } else {
             putMessageResult = 
this.brokerController.getMessageStore().asyncPutMessage(msgInner);
         }
-        return handlePutMessageResultFuture(putMessageResult, response, 
request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
+        return handlePutMessageResultFuture(putMessageResult, response, 
request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt, 
requestHeader, mappingContext);
     }
 
     private CompletableFuture<RemotingCommand> 
handlePutMessageResultFuture(CompletableFuture<PutMessageResult> 
putMessageResult,
@@ -402,9 +415,11 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                                                                             
SendMessageResponseHeader responseHeader,
                                                                             
SendMessageContext sendMessageContext,
                                                                             
ChannelHandlerContext ctx,
-                                                                            
int queueIdInt) {
+                                                                            
int queueIdInt,
+                                                                            
SendMessageRequestHeader requestHeader,
+                                                                            
TopicQueueMappingContext mappingContext) {
         return putMessageResult.thenApply((r) ->
-            handlePutMessageResult(r, response, request, msgInner, 
responseHeader, sendMessageContext, ctx, queueIdInt)
+            handlePutMessageResult(r, response, request, msgInner, 
responseHeader, sendMessageContext, ctx, queueIdInt, requestHeader, 
mappingContext)
         );
     }
 
@@ -456,7 +471,8 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
     private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                         final RemotingCommand request,
                                         final SendMessageContext 
sendMessageContext,
-                                        final SendMessageRequestHeader 
requestHeader) throws RemotingCommandException {
+                                        final SendMessageRequestHeader 
requestHeader,
+                                        final TopicQueueMappingContext 
mappingContext) throws RemotingCommandException {
 
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
         final SendMessageResponseHeader responseHeader = 
(SendMessageResponseHeader)response.readCustomHeader();
@@ -525,14 +541,15 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             putMessageResult = 
this.brokerController.getMessageStore().putMessage(msgInner);
         }
 
-        return handlePutMessageResult(putMessageResult, response, request, 
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
+        return handlePutMessageResult(putMessageResult, response, request, 
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, requestHeader, 
mappingContext);
 
     }
 
     private RemotingCommand handlePutMessageResult(PutMessageResult 
putMessageResult, RemotingCommand response,
                                                    RemotingCommand request, 
MessageExt msg,
                                                    SendMessageResponseHeader 
responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext 
ctx,
-                                                   int queueIdInt) {
+                                                   int queueIdInt, 
SendMessageRequestHeader requestHeader,
+                                                   TopicQueueMappingContext 
mappingContext) {
         if (putMessageResult == null) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("store putMessage return null");
@@ -609,7 +626,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             responseHeader.setQueueId(queueIdInt);
             
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
 
-            RemotingCommand rewriteResult =  
rewriteResponseForStaticTopic(msg.getTopic(), responseHeader);
+            RemotingCommand rewriteResult =  
rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
             if (rewriteResult != null) {
                 return rewriteResult;
             }
@@ -647,7 +664,8 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
 
     private CompletableFuture<RemotingCommand> 
asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                      
SendMessageContext mqtraceContext,
-                                                                     
SendMessageRequestHeader requestHeader) {
+                                                                     
SendMessageRequestHeader requestHeader,
+                                                                     
TopicQueueMappingContext mappingContext) {
         final RemotingCommand response = preSend(ctx, request, requestHeader);
         final SendMessageResponseHeader responseHeader = 
(SendMessageResponseHeader)response.readCustomHeader();
 
@@ -689,7 +707,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         MessageAccessor.putProperty(messageExtBatch, 
MessageConst.PROPERTY_CLUSTER, clusterName);
 
         CompletableFuture<PutMessageResult> putMessageResult = 
this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
-        return handlePutMessageResultFuture(putMessageResult, response, 
request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
+        return handlePutMessageResultFuture(putMessageResult, response, 
request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt, 
requestHeader, mappingContext);
     }
 
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java 
b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
index 78a27dc..0a9ee96 100644
--- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java
@@ -7,6 +7,7 @@ public class LogicQueueMappingItem {
     private String bname;
     private long logicOffset; // the start of the logic offset
     private long startOffset; // the start of the physical offset
+    private long endOffset; // the end of the physical offset
     private long timeOfStart = -1; //mutable
 
     public LogicQueueMappingItem(int gen, int queueId, String bname, long 
logicOffset, long startOffset, long timeOfStart) {
@@ -18,8 +19,32 @@ public class LogicQueueMappingItem {
         this.timeOfStart = timeOfStart;
     }
 
-    public long convertToStaticLogicOffset(long physicalLogicOffset) {
-        return  logicOffset + (physicalLogicOffset - startOffset);
+    public long convertToStaticQueueOffset(long physicalQueueOffset) {
+        return  logicOffset + (physicalQueueOffset - startOffset);
+    }
+
+    public long convertToPhysicalQueueOffset(long staticQueueOffset) {
+        return  (staticQueueOffset - logicOffset) + startOffset;
+    }
+
+    public long convertToMaxStaticQueueOffset() {
+        if (endOffset >= startOffset) {
+            return logicOffset + endOffset - startOffset;
+        } else {
+            return logicOffset;
+        }
+    }
+    public boolean isShouldDeleted() {
+        return endOffset == startOffset;
+    }
+
+    public boolean isEndOffsetDecided() {
+        //if the endOffset == startOffset, then the item should be deleted
+        return endOffset > startOffset;
+    }
+
+    public long convertOffsetDelta() {
+        return logicOffset - startOffset;
     }
 
     public int getGen() {
@@ -55,4 +80,13 @@ public class LogicQueueMappingItem {
     public long getStartOffset() {
         return startOffset;
     }
+
+    public long getEndOffset() {
+        return endOffset;
+    }
+
+
+    public void setEndOffset(long endOffset) {
+        this.endOffset = endOffset;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
new file mode 100644
index 0000000..50ac43e
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+public class TopicQueueMappingContext  {
+    private String topic;
+    private Integer globalId;
+    private Long globalOffset;
+    private TopicQueueMappingDetail mappingDetail;
+    private LogicQueueMappingItem mappingItem;
+
+    public TopicQueueMappingContext(String topic, Integer globalId, Long 
globalOffset, TopicQueueMappingDetail mappingDetail, LogicQueueMappingItem 
mappingItem) {
+        this.topic = topic;
+        this.globalId = globalId;
+        this.globalOffset = globalOffset;
+        this.mappingDetail = mappingDetail;
+        this.mappingItem = mappingItem;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public Integer getGlobalId() {
+        return globalId;
+    }
+
+    public void setGlobalId(Integer globalId) {
+        this.globalId = globalId;
+    }
+
+    public Long getGlobalOffset() {
+        return globalOffset;
+    }
+
+    public void setGlobalOffset(Long globalOffset) {
+        this.globalOffset = globalOffset;
+    }
+
+    public TopicQueueMappingDetail getMappingDetail() {
+        return mappingDetail;
+    }
+
+    public void setMappingDetail(TopicQueueMappingDetail mappingDetail) {
+        this.mappingDetail = mappingDetail;
+    }
+
+    public LogicQueueMappingItem getMappingItem() {
+        return mappingItem;
+    }
+
+    public void setMappingItem(LogicQueueMappingItem mappingItem) {
+        this.mappingItem = mappingItem;
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 0021310..8c2aad9 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -27,9 +27,6 @@ public class TopicQueueMappingDetail extends 
TopicQueueMappingInfo {
 
     // the mapping info in current broker, do not register to nameserver
     ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> 
hostedQueues = new ConcurrentHashMap<Integer, 
ImmutableList<LogicQueueMappingItem>>();
-    transient ConcurrentMap<Integer/*physicalId*/, Integer/*logicId*/> 
currIdMapRevert = new ConcurrentHashMap<Integer, Integer>();
-
-
 
     public TopicQueueMappingDetail(String topic, int totalQueues, String 
bname) {
         super(topic, totalQueues, bname);
@@ -48,7 +45,6 @@ public class TopicQueueMappingDetail extends 
TopicQueueMappingInfo {
     public void buildIdMap() {
         this.currIdMap = buildIdMap(LEVEL_0);
         this.prevIdMap = buildIdMap(LEVEL_1);
-        this.currIdMapRevert = revert(this.currIdMap);
     }
 
     public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer, 
Integer> original) {
@@ -103,16 +99,48 @@ public class TopicQueueMappingDetail extends 
TopicQueueMappingInfo {
             return -1;
         }
         if (bname.equals(mappingItems.get(mappingItems.size() - 
1).getBname())) {
-            return mappingItems.get(mappingItems.size() - 
1).convertToStaticLogicOffset(physicalLogicOffset);
+            return mappingItems.get(mappingItems.size() - 
1).convertToStaticQueueOffset(physicalLogicOffset);
         }
         //Consider the "switch" process, reduce the error
         if (mappingItems.size() >= 2
             && bname.equals(mappingItems.get(mappingItems.size() - 
2).getBname())) {
-            return mappingItems.get(mappingItems.size() - 
2).convertToStaticLogicOffset(physicalLogicOffset);
+            return mappingItems.get(mappingItems.size() - 
2).convertToStaticQueueOffset(physicalLogicOffset);
         }
         return -1;
     }
 
+    public LogicQueueMappingItem getLogicQueueMappingItem(Integer globalId, 
long logicOffset) {
+        List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+        if (mappingItems == null
+                || mappingItems.isEmpty()) {
+            return null;
+        }
+        //Could use bi-search to polish performance
+        for (int i = mappingItems.size() - 1; i >= 0; i--) {
+            LogicQueueMappingItem item =  mappingItems.get(i);
+            if (logicOffset >= item.getLogicOffset()) {
+                return item;
+            }
+        }
+        //if not found, maybe out of range, return the first one
+        for (int i = 0; i < mappingItems.size(); i++) {
+            if (!mappingItems.get(i).isShouldDeleted()) {
+                return mappingItems.get(i);
+            }
+        }
+        return null;
+    }
+
+    public long getMaxOffsetFromMapping(Integer globalId) {
+        List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+        if (mappingItems == null
+                || mappingItems.isEmpty()) {
+            return -1;
+        }
+        LogicQueueMappingItem item =  mappingItems.get(mappingItems.size() - 
1);
+        return item.convertToMaxStaticQueueOffset();
+    }
+
 
     public TopicQueueMappingInfo cloneAsMappingInfo() {
         TopicQueueMappingInfo topicQueueMappingInfo = new 
TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname);
@@ -122,13 +150,6 @@ public class TopicQueueMappingDetail extends 
TopicQueueMappingInfo {
         return topicQueueMappingInfo;
     }
 
-    public ConcurrentMap<Integer, Integer> getCurrIdMapRevert() {
-        return currIdMapRevert;
-    }
-
-    public void setCurrIdMapRevert(ConcurrentMap<Integer, Integer> 
currIdMapRevert) {
-        this.currIdMapRevert = currIdMapRevert;
-    }
 
     public int getTotalQueues() {
         return totalQueues;
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 106e89e..1bce01f 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,12 +20,12 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class PullMessageRequestHeader implements CommandCustomHeader {
+public class PullMessageRequestHeader extends RequestHeader {
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index 0112f7d..88af984 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -22,6 +22,7 @@ package org.apache.rocketmq.common.protocol.header;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class PullMessageResponseHeader implements CommandCustomHeader {
@@ -33,6 +34,8 @@ public class PullMessageResponseHeader implements 
CommandCustomHeader {
     private Long minOffset;
     @CFNotNull
     private Long maxOffset;
+    @CFNullable
+    private Long offsetDelta;
 
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -69,4 +72,12 @@ public class PullMessageResponseHeader implements 
CommandCustomHeader {
     public void setSuggestWhichBrokerId(Long suggestWhichBrokerId) {
         this.suggestWhichBrokerId = suggestWhichBrokerId;
     }
+
+    public Long getOffsetDelta() {
+        return offsetDelta;
+    }
+
+    public void setOffsetDelta(Long offsetDelta) {
+        this.offsetDelta = offsetDelta;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 2df31e6..f9dcbff 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,12 +20,12 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class SendMessageRequestHeader implements CommandCustomHeader {
+public class SendMessageRequestHeader extends RequestHeader {
     @CFNotNull
     private String producerGroup;
     @CFNotNull
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java
new file mode 100644
index 0000000..109fb9e
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RequestHeader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting;
+
+public abstract class RequestHeader implements  CommandCustomHeader {
+    protected Boolean physical;
+
+    public Boolean getPhysical() {
+        return physical;
+    }
+
+    public void setPhysical(Boolean physical) {
+        this.physical = physical;
+    }
+}

Reply via email to