This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch feature/notification in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit cc33e4eb2af4907c063f5d83c4006550d1880160 Author: zhouxiang <[email protected]> AuthorDate: Mon Jan 16 11:42:36 2023 +0800 [ISSUE #5847] Refector logic in NotificationProcessor * Fix hasMsg logic in NotificationProcessor * Add notify in PopReviveService and order ack situation * Refector wakeup logic --- .../apache/rocketmq/broker/BrokerController.java | 4 ++ .../broker/processor/AckMessageProcessor.java | 11 ++- .../broker/processor/NotificationProcessor.java | 81 +++++++++------------- .../broker/processor/PopMessageProcessor.java | 1 + .../broker/processor/PopReviveService.java | 2 + 5 files changed, 49 insertions(+), 50 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index eb9f629b5..8b3dbdfa9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1228,6 +1228,10 @@ public class BrokerController { return popMessageProcessor; } + public NotificationProcessor getNotificationProcessor() { + return notificationProcessor; + } + public TimerMessageStore getTimerMessageStore() { return timerMessageStore; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 80f06aed0..7b366cb8b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -141,6 +141,7 @@ public class AckMessageProcessor implements NettyRequestProcessor { ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo)); int rqId = ExtraInfoUtil.getReviveQid(extraInfo); + long invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo); this.brokerController.getBrokerStatsManager().incBrokerAckNums(1); this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1); @@ -171,8 +172,12 @@ public class AckMessageProcessor implements NettyRequestProcessor { requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset); - this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(), - requestHeader.getQueueId()); + if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(), + requestHeader.getConsumerGroup(), requestHeader.getQueueId(), invisibleTime)) { + this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(), + requestHeader.getQueueId()); + this.brokerController.getNotificationProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getQueueId()); + } } else if (nextOffset == -1) { String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", lockKey, oldOffset, requestHeader.getOffset(), nextOffset, channel.remoteAddress()); @@ -201,7 +206,7 @@ public class AckMessageProcessor implements NettyRequestProcessor { msgInner.setBornTimestamp(System.currentTimeMillis()); msgInner.setBornHost(this.brokerController.getStoreHost()); msgInner.setStoreHost(this.brokerController.getStoreHost()); - msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo)); + msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + invisibleTime); msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg)); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 332843b9f..e10d72328 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -74,7 +74,7 @@ public class NotificationProcessor implements NettyRequestProcessor { break; } POP_LOGGER.info("timeout , wakeUp Notification : {}", tmPopRequest); - wakeUp(tmPopRequest, false); + wakeUp(tmPopRequest); tmPopRequest = popQ.peek(); } else { break; @@ -111,28 +111,25 @@ public class NotificationProcessor implements NettyRequestProcessor { } public void notifyMessageArriving(final String topic, final int queueId) { - ArrayBlockingQueue<NotificationRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingNotificationKey(topic, -1)); - if (remotingCommands != null) { - List<NotificationRequest> c = new ArrayList<>(); - remotingCommands.drainTo(c); - for (NotificationRequest notificationRequest : c) { - POP_LOGGER.info("new msg arrive , wakeUp : {}", notificationRequest); - wakeUp(notificationRequest, true); - - } + notifyMessageArrivingForQueue(topic, -1); + if (queueId > 0) { + notifyMessageArrivingForQueue(topic, queueId); } - remotingCommands = pollingMap.get(KeyBuilder.buildPollingNotificationKey(topic, queueId)); + } + + public void notifyMessageArrivingForQueue(final String topic, final int queueId) { + ArrayBlockingQueue<NotificationRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingNotificationKey(topic, queueId)); if (remotingCommands != null) { List<NotificationRequest> c = new ArrayList<>(); remotingCommands.drainTo(c); for (NotificationRequest notificationRequest : c) { POP_LOGGER.info("new msg arrive , wakeUp : {}", notificationRequest); - wakeUp(notificationRequest, true); + wakeUp(notificationRequest); } } } - private void wakeUp(final NotificationRequest request, final boolean hasMsg) { + private void wakeUp(final NotificationRequest request) { if (request == null || !request.complete()) { return; } @@ -142,11 +139,7 @@ public class NotificationProcessor implements NettyRequestProcessor { Runnable run = () -> { try { final RemotingCommand response; - if (hasMsg) { - response = NotificationProcessor.this.responseNotification(request.getChannel(), true); - } else { - response = NotificationProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand()); - } + response = NotificationProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand()); if (response != null) { response.setOpaque(request.getRemotingCommand().getOpaque()); response.markResponseType(); @@ -165,14 +158,6 @@ public class NotificationProcessor implements NettyRequestProcessor { this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand())); } - public RemotingCommand responseNotification(final Channel channel, boolean hasMsg) { - RemotingCommand response = RemotingCommand.createResponseCommand(NotificationResponseHeader.class); - final NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.readCustomHeader(); - responseHeader.setHasMsg(hasMsg); - response.setCode(ResponseCode.SUCCESS); - return response; - } - private RemotingCommand processRequest(final Channel channel, RemotingCommand request) throws RemotingCommandException { RemotingCommand response = RemotingCommand.createResponseCommand(NotificationResponseHeader.class); @@ -237,31 +222,33 @@ public class NotificationProcessor implements NettyRequestProcessor { } } } - if (!hasMsg && requestHeader.getQueueId() < 0) { - // read all queue - for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); - hasMsg = hasMsgFromQueue(false, requestHeader, queueId); - if (hasMsg) { - break; - } - } - } else { - int queueId = requestHeader.getQueueId(); - hasMsg = hasMsgFromQueue(false, requestHeader, queueId); - } - // if it doesn't have message, fetch retry again - if (!needRetry && !hasMsg) { - TopicConfig retryTopicConfig = - this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); - if (retryTopicConfig != null) { - for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); - hasMsg = hasMsgFromQueue(true, requestHeader, queueId); + if (!hasMsg) { + if (requestHeader.getQueueId() < 0) { + // read all queue + for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { + int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); + hasMsg = hasMsgFromQueue(false, requestHeader, queueId); if (hasMsg) { break; } } + } else { + int queueId = requestHeader.getQueueId(); + hasMsg = hasMsgFromQueue(false, requestHeader, queueId); + } + // if it doesn't have message, fetch retry again + if (!needRetry && !hasMsg) { + TopicConfig retryTopicConfig = + this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); + if (retryTopicConfig != null) { + for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { + int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); + hasMsg = hasMsgFromQueue(true, requestHeader, queueId); + if (hasMsg) { + break; + } + } + } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 2bea535f4..e2d51857e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -181,6 +181,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { // notify pop queue notifySuccess = this.brokerController.getPopMessageProcessor().notifyMessageArriving(topic, group, queueId); } + this.brokerController.getNotificationProcessor().notifyMessageArriving(topic, queueId); if (this.brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("notify long polling request. topic:{}, group:{}, queueId:{}, success:{}", topic, group, queueId, notifySuccess); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 52b848b07..d0e9dbc36 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -140,6 +140,8 @@ public class PopReviveService extends ServiceThread { popCheckPoint.getCId(), -1 ); + brokerController.getNotificationProcessor().notifyMessageArriving( + KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1); } return true; }
