This is an automated email from the ASF dual-hosted git repository. dongeforever pushed a commit to branch 5.0.0-alpha in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit add737fdb9e984f175b42d3d19fff6d9c2085b39 Author: dongeforever <[email protected]> AuthorDate: Wed Jan 5 17:01:11 2022 +0800 Convert the consumer offset too --- .../broker/processor/AdminBrokerProcessor.java | 21 +++---- .../broker/processor/ConsumerManageProcessor.java | 68 +++++++++++++++++++++- .../broker/topic/TopicQueueMappingManager.java | 2 +- .../apache/rocketmq/common/rpc/RpcClientImpl.java | 25 ++++++++ ..._Topic_Logic_Queue_\350\256\276\350\256\241.md" | 9 ++- 5 files changed, 106 insertions(+), 19 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 6505263..568a728 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1176,8 +1176,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements continue; } - TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic); - { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); @@ -1208,26 +1206,21 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements // the consumerOffset cannot be zero for static topic because of the "double read check" strategy // just remain the logic for dynamic topic // maybe we should remove it in the future - if (mappingDetail == null) { - if (consumerOffset < 0) - consumerOffset = 0; - } + if (consumerOffset < 0) + consumerOffset = 0; offsetWrapper.setBrokerOffset(brokerOffset); offsetWrapper.setConsumerOffset(consumerOffset); // the consumeOffset is not in this broker for static topic // and may get the wrong result - if (mappingDetail == null) { - long timeOffset = consumerOffset - 1; - if (timeOffset >= 0) { - long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); - if (lastTimestamp > 0) { - offsetWrapper.setLastTimestamp(lastTimestamp); - } + long timeOffset = consumerOffset - 1; + if (timeOffset >= 0) { + long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); + if (lastTimestamp > 0) { + offsetWrapper.setLastTimestamp(lastTimestamp); } } - consumeStats.getOffsetTable().put(mq, offsetWrapper); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 04e705b..a266442 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -29,11 +29,15 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader; +import org.apache.rocketmq.common.rpc.RpcClientUtils; import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcResponse; +import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; +import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -110,6 +114,37 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen return response; } + public RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader, final TopicQueueMappingContext mappingContext) { + try { + if (mappingContext.getMappingDetail() == null) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + if (!mappingContext.isLeader()) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname())); + } + Long globalOffset = requestHeader.getCommitOffset(); + LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true); + requestHeader.setQueueId(mappingItem.getQueueId()); + requestHeader.setLo(false); + requestHeader.setBname(mappingItem.getBname()); + requestHeader.setCommitOffset(mappingItem.computePhysicalQueueOffset(globalOffset)); + //leader, let it go, do not need to rewrite the response + if (mappingDetail.getBname().equals(mappingItem.getBname())) { + return null; + } + RpcRequest rpcRequest = new RpcRequest(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + return RpcClientUtils.createCommandForRpcResponse(rpcResponse); + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + } + + private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = @@ -119,7 +154,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); - RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); + RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; } @@ -144,6 +179,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen if (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0) { //as physical, just let it go + mappingContext.setCurrentItem(mappingItemList.get(0)); requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId()); return null; } @@ -154,6 +190,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen //double read, first from leader, then from second leader for (int i = itemList.size() - 1; i >= 0; i--) { LogicQueueMappingItem mappingItem = itemList.get(i); + mappingContext.setCurrentItem(mappingItem); if (mappingItem.getBname().equals(mappingDetail.getBname())) { offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId()); if (offset >= 0) { @@ -194,9 +231,31 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, maybe this group consumer boot first"); } + RemotingCommand rewriteResponseResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode()); + if (rewriteResponseResult != null) { + return rewriteResponseResult; + } return response; } catch (Throwable t) { - t.printStackTrace(); + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + } + + + public RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader, final QueryConsumerOffsetResponseHeader responseHeader, + final TopicQueueMappingContext mappingContext, final int code) { + try { + if (mappingContext.getMappingDetail() == null) { + return null; + } + if (code != ResponseCode.SUCCESS) { + return null; + } + LogicQueueMappingItem item = mappingContext.getCurrentItem(); + responseHeader.setOffset(item.computeStaticQueueOffsetStrictly(responseHeader.getOffset())); + //no need to construct new object + return null; + } catch (Throwable t) { return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } } @@ -245,6 +304,11 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen } } + RemotingCommand rewriteResponseResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode()); + if (rewriteResponseResult != null) { + return rewriteResponseResult; + } + return response; } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 56fc792..dd7e708 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -189,7 +189,7 @@ public class TopicQueueMappingManager extends ConfigManager { //Do not return a null context public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) { - //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic + // if lo is set to false explicitly, it maybe the forwarded request if (requestHeader.getLo() != null && Boolean.FALSE.equals(requestHeader.getLo())) { return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null); diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 62e6ec1..3782ab0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -28,6 +28,8 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; @@ -101,6 +103,9 @@ public class RpcClientImpl implements RpcClient { case RequestCode.QUERY_CONSUMER_OFFSET: rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs); break; + case RequestCode.UPDATE_CONSUMER_OFFSET: + rpcResponsePromise = handleUpdateConsumerOffset(addr, request, timeoutMs); + break; case RequestCode.GET_TOPIC_STATS_INFO: rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicStatsTable.class); break; @@ -234,6 +239,26 @@ public class RpcClientImpl implements RpcClient { return rpcResponsePromise; } + public Promise<RpcResponse> handleUpdateConsumerOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + final Promise<RpcResponse> rpcResponsePromise = createResponseFuture(); + + RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); + RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); + assert responseCommand != null; + switch (responseCommand.getCode()) { + case ResponseCode.SUCCESS: { + UpdateConsumerOffsetResponseHeader responseHeader = + (UpdateConsumerOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(UpdateConsumerOffsetResponseHeader.class); + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); + break; + } + default: { + rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); + } + } + return rpcResponsePromise; + } + public Promise<RpcResponse> handleCommonBodyRequest(final String addr, RpcRequest rpcRequest, long timeoutMillis, Class bodyClass) throws Exception { final Promise<RpcResponse> rpcResponsePromise = createResponseFuture(); RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); diff --git "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" index c06d83f..ac1cc6b 100644 --- "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" +++ "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" @@ -7,6 +7,8 @@ | 2021-12-03 | 增加代码走读的说明| dongforever | | 2021-12-10 | 引入Scope概念,保留『多集群动态零耦合』的集群设计模型 | dongforever | | 2021-12-23 | 梳理待完成事项;讨论Admin接口的适配方式 | dongforever | +| 2021-01-05 | Offset存储改成『转换制』,以更好适配原有逻辑 | dongforever | + @@ -342,8 +344,9 @@ UpdateStaticTopic 命令会自动计算预期的分布情况,包括但不限 #### consumerOffsets 系列 -Offset的存储,无需转换,直接存储在 LogicQueue 所对应的最新 PhysicalQueue 中。 -读取时,采取『Double-Read-Check』机制。 +Offset的存储,进行转换,存储在对应PhysicalQueue 所在的 Broker上面。 +读取时,采取『Double-Read-Check』机制,并进行转换。 +这样可以最大程度与 PhysicalQueue 的相关逻辑进行适配,比如 ConsumerProgress 可以看到『最近拉取时间』。 #### Client @@ -454,6 +457,8 @@ User 接口,使用范围广泛如多语言等,应该尽可能简单,把适 #### 阻止Pop模式、事务消息、定时消息使用 LogicQueue 不兼容 事务消息和定时消息。 LogicQueue 当前不支持Pop模式消费。 +#### Nameserver 相关生命周期完善 +目前没有处理Nameserver中Mapping数据的生命周期 #### ConsumeQueue 的 correctMinOffset 逻辑存在缺陷 可能导致 LogicQueue 无法清除已经过期的 MappingItem。 #### getOffsetInQueueByTime 语义有缺陷
