http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java new file mode 100644 index 0000000..bdceeb0 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; +import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; +import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author shijia.wxr + */ +public class ClientManageProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + + public ClientManageProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + switch (request.getCode()) { + case RequestCode.HEART_BEAT: + return this.heartBeat(ctx, request); + case RequestCode.UNREGISTER_CLIENT: + return this.unregisterClient(ctx, request); + default: + break; + } + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + + public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + ctx.channel(), + heartbeatData.getClientID(), + request.getLanguage(), + request.getVersion() + ); + + for (ConsumerData data : heartbeatData.getConsumerDataSet()) { + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( + data.getGroupName()); + boolean isNotifyConsumerIdsChangedEnable = true; + if (null != subscriptionGroupConfig) { + isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); + int topicSysFlag = 0; + if (data.isUnitMode()) { + topicSysFlag = TopicSysFlag.buildSysFlag(false, true); + } + String newTopic = MixAll.getRetryTopic(data.getGroupName()); + this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( + newTopic, + subscriptionGroupConfig.getRetryQueueNums(), + PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); + } + + boolean changed = this.brokerController.getConsumerManager().registerConsumer( + data.getGroupName(), + clientChannelInfo, + data.getConsumeType(), + data.getMessageModel(), + data.getConsumeFromWhere(), + data.getSubscriptionDataSet(), + isNotifyConsumerIdsChangedEnable + ); + + if (changed) { + log.info("registerConsumer info changed {} {}", + data.toString(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ); + } + } + + for (ProducerData data : heartbeatData.getProducerDataSet()) { + this.brokerController.getProducerManager().registerProducer(data.getGroupName(), + clientChannelInfo); + } + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = + RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class); + final UnregisterClientRequestHeader requestHeader = + (UnregisterClientRequestHeader) request + .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); + + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + ctx.channel(), + requestHeader.getClientID(), + request.getLanguage(), + request.getVersion()); + { + final String group = requestHeader.getProducerGroup(); + if (group != null) { + this.brokerController.getProducerManager().unregisterProducer(group, clientChannelInfo); + } + } + + { + final String group = requestHeader.getConsumerGroup(); + if (group != null) { + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group); + boolean isNotifyConsumerIdsChangedEnable = true; + if (null != subscriptionGroupConfig) { + isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); + } + this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable); + } + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java new file mode 100644 index 0000000..09a2607 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.common.protocol.header.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class ConsumerManageProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + private final BrokerController brokerController; + + + public ConsumerManageProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + switch (request.getCode()) { + case RequestCode.GET_CONSUMER_LIST_BY_GROUP: + return this.getConsumerListByGroup(ctx, request); + case RequestCode.UPDATE_CONSUMER_OFFSET: + return this.updateConsumerOffset(ctx, request); + case RequestCode.QUERY_CONSUMER_OFFSET: + return this.queryConsumerOffset(ctx, request); + default: + break; + } + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + + + public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = + RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); + final GetConsumerListByGroupRequestHeader requestHeader = + (GetConsumerListByGroupRequestHeader) request + .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); + + ConsumerGroupInfo consumerGroupInfo = + this.brokerController.getConsumerManager().getConsumerGroupInfo( + requestHeader.getConsumerGroup()); + if (consumerGroupInfo != null) { + List<String> clientIds = consumerGroupInfo.getAllClientId(); + if (!clientIds.isEmpty()) { + GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody(); + body.setConsumerIdList(clientIds); + response.setBody(body.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } else { + log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + } + } else { + log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + } + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup()); + return response; + } + + private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = + RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class); + final UpdateConsumerOffsetRequestHeader requestHeader = + (UpdateConsumerOffsetRequestHeader) request + .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); + this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), + requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + + private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = + RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); + final QueryConsumerOffsetResponseHeader responseHeader = + (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); + final QueryConsumerOffsetRequestHeader requestHeader = + (QueryConsumerOffsetRequestHeader) request + .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); + + long offset = + this.brokerController.getConsumerOffsetManager().queryOffset( + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); + + + if (offset >= 0) { + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + } else { + long minOffset = + this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), + requestHeader.getQueueId()); + if (minOffset <= 0 + && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( + requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { + responseHeader.setOffset(0L); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + } else { + response.setCode(ResponseCode.QUERY_NOT_FOUND); + response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"); + } + } + + return response; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java new file mode 100644 index 0000000..fc38238 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author shijia.wxr + */ +public class EndTransactionProcessor implements NettyRequestProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); + private final BrokerController brokerController; + + public EndTransactionProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final EndTransactionRequestHeader requestHeader = + (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); + + + if (requestHeader.getFromTransactionCheck()) { + switch (requestHeader.getCommitOrRollback()) { + case MessageSysFlag.TRANSACTION_NOT_TYPE: { + LOGGER.warn("check producer[{}] transaction state, but it's pending status." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + return null; + } + + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { + LOGGER.warn("check producer[{}] transaction state, the producer commit the message." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + + break; + } + + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { + LOGGER.warn("check producer[{}] transaction state, the producer rollback the message." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + break; + } + default: + return null; + } + } else { + switch (requestHeader.getCommitOrRollback()) { + case MessageSysFlag.TRANSACTION_NOT_TYPE: { + LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + return null; + } + + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { + break; + } + + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { + LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + break; + } + default: + return null; + } + } + + final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset()); + if (msgExt != null) { + final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); + if (!pgroupRead.equals(requestHeader.getProducerGroup())) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("the producer group wrong"); + return response; + } + + if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("the transaction state table offset wrong"); + return response; + } + + if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("the commit log offset wrong"); + return response; + } + + MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt); + msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); + + msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); + msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); + msgInner.setStoreTimestamp(msgExt.getStoreTimestamp()); + if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { + msgInner.setBody(null); + } + + final MessageStore messageStore = this.brokerController.getMessageStore(); + final PutMessageResult putMessageResult = messageStore.putMessage(msgInner); + if (putMessageResult != null) { + switch (putMessageResult.getPutMessageStatus()) { + // Success + case PUT_OK: + case FLUSH_DISK_TIMEOUT: + case FLUSH_SLAVE_TIMEOUT: + case SLAVE_NOT_AVAILABLE: + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + break; + // Failed + case CREATE_MAPEDFILE_FAILED: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("create maped file failed."); + break; + case MESSAGE_ILLEGAL: + case PROPERTIES_SIZE_EXCEEDED: + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + break; + case SERVICE_NOT_AVAILABLE: + response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); + response.setRemark("service not available now."); + break; + case OS_PAGECACHE_BUSY: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("OS page cache busy, please try another machine"); + break; + case UNKNOWN_ERROR: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR"); + break; + default: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR DEFAULT"); + break; + } + + return response; + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("store putMessage return null"); + } + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("find prepared transaction message failed"); + return response; + } + + return response; + } + + @Override + public boolean rejectRequest() { + return false; + } + + private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) { + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setBody(msgExt.getBody()); + msgInner.setFlag(msgExt.getFlag()); + MessageAccessor.setProperties(msgInner, msgExt.getProperties()); + + TopicFilterType topicFilterType = + (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG + : TopicFilterType.SINGLE_TAG; + long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); + msgInner.setTagsCode(tagsCodeValue); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + + msgInner.setSysFlag(msgExt.getSysFlag()); + msgInner.setBornTimestamp(msgExt.getBornTimestamp()); + msgInner.setBornHost(msgExt.getBornHost()); + msgInner.setStoreHost(msgExt.getStoreHost()); + msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); + + msgInner.setWaitStoreMsgOK(false); + MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + + msgInner.setTopic(msgExt.getTopic()); + msgInner.setQueueId(msgExt.getQueueId()); + + return msgInner; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java new file mode 100644 index 0000000..acf25ea --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author shijia.wxr + */ +public class ForwardRequestProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + private final BrokerController brokerController; + + + public ForwardRequestProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java new file mode 100644 index 0000000..3094079 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -0,0 +1,542 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.broker.longpolling.PullRequest; +import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; +import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; +import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import io.netty.channel.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class PullMessageProcessor implements NettyRequestProcessor { + private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + private List<ConsumeMessageHook> consumeMessageHookList; + + public PullMessageProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override + public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + return this.processRequest(ctx.channel(), request, true); + } + + @Override + public boolean rejectRequest() { + return false; + } + + private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) + throws RemotingCommandException { + RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); + final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); + final PullMessageRequestHeader requestHeader = + (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); + + + response.setOpaque(request.getOpaque()); + + if (LOG.isDebugEnabled()) { + LOG.debug("receive PullMessage request command, " + request); + } + + + if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden"); + return response; + } + + + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " " + + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + + + if (!subscriptionGroupConfig.isConsumeEnable()) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); + return response; + } + + final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag()); + final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag()); + final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag()); + + final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0; + + + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + if (null == topicConfig) { + LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel)); + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark( + "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + return response; + } + + + if (!PermName.isReadable(topicConfig.getPerm())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden"); + return response; + } + + + if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { + String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic() + + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress(); + LOG.warn(errorInfo); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorInfo); + return response; + } + + + SubscriptionData subscriptionData = null; + if (hasSubscriptionFlag) { + try { + subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getSubscription()); + } catch (Exception e) { + LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // + requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark("parse the consumer's subscription failed"); + return response; + } + } else { + ConsumerGroupInfo consumerGroupInfo = + this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); + if (null == consumerGroupInfo) { + LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); + response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); + return response; + } + + if (!subscriptionGroupConfig.isConsumeBroadcastEnable() // + && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way"); + return response; + } + + subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); + if (null == subscriptionData) { + LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); + response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); + response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); + return response; + } + + + if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { + LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), + subscriptionData.getSubString()); + response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); + response.setRemark("the consumer's subscription not latest"); + return response; + } + } + + final GetMessageResult getMessageResult = + this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData); + if (getMessageResult != null) { + response.setRemark(getMessageResult.getStatus().name()); + responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); + responseHeader.setMinOffset(getMessageResult.getMinOffset()); + responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); + + + if (getMessageResult.isSuggestPullingFromSlave()) { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); + } else { + responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); + } + + switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { + case ASYNC_MASTER: + case SYNC_MASTER: + break; + case SLAVE: + if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); + } + break; + } + + if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { + // consume too slow ,redirect to another machine + if (getMessageResult.isSuggestPullingFromSlave()) { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); + } + // consume ok + else { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); + } + } else { + responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); + } + + switch (getMessageResult.getStatus()) { + case FOUND: + response.setCode(ResponseCode.SUCCESS); + break; + case MESSAGE_WAS_REMOVING: + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + break; + case NO_MATCHED_LOGIC_QUEUE: + case NO_MESSAGE_IN_QUEUE: + if (0 != requestHeader.getQueueOffset()) { + response.setCode(ResponseCode.PULL_OFFSET_MOVED); + + // XXX: warn and notify me + LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", // + requestHeader.getQueueOffset(), // + getMessageResult.getNextBeginOffset(), // + requestHeader.getTopic(), // + requestHeader.getQueueId(), // + requestHeader.getConsumerGroup()// + ); + } else { + response.setCode(ResponseCode.PULL_NOT_FOUND); + } + break; + case NO_MATCHED_MESSAGE: + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + break; + case OFFSET_FOUND_NULL: + response.setCode(ResponseCode.PULL_NOT_FOUND); + break; + case OFFSET_OVERFLOW_BADLY: + response.setCode(ResponseCode.PULL_OFFSET_MOVED); + // XXX: warn and notify me + LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: " + + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress()); + break; + case OFFSET_OVERFLOW_ONE: + response.setCode(ResponseCode.PULL_NOT_FOUND); + break; + case OFFSET_TOO_SMALL: + response.setCode(ResponseCode.PULL_OFFSET_MOVED); + LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), + getMessageResult.getMinOffset(), channel.remoteAddress()); + break; + default: + assert false; + break; + } + + if (this.hasConsumeMessageHook()) { + ConsumeMessageContext context = new ConsumeMessageContext(); + context.setConsumerGroup(requestHeader.getConsumerGroup()); + context.setTopic(requestHeader.getTopic()); + context.setQueueId(requestHeader.getQueueId()); + + String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); + + switch (response.getCode()) { + case ResponseCode.SUCCESS: + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); + int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount; + + context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS); + context.setCommercialRcvTimes(incValue); + context.setCommercialRcvSize(getMessageResult.getBufferTotalSize()); + context.setCommercialOwner(owner); + + break; + case ResponseCode.PULL_NOT_FOUND: + if (!brokerAllowSuspend) { + + + context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); + context.setCommercialRcvTimes(1); + context.setCommercialOwner(owner); + + } + break; + case ResponseCode.PULL_RETRY_IMMEDIATELY: + case ResponseCode.PULL_OFFSET_MOVED: + context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); + context.setCommercialRcvTimes(1); + context.setCommercialOwner(owner); + break; + default: + assert false; + break; + } + + this.executeConsumeMessageHookBefore(context); + } + + switch (response.getCode()) { + case ResponseCode.SUCCESS: + + this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + getMessageResult.getMessageCount()); + + this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + getMessageResult.getBufferTotalSize()); + + this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); + if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { + final long beginTimeMills = this.brokerController.getMessageStore().now(); + final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); + this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), + requestHeader.getTopic(), requestHeader.getQueueId(), + (int) (this.brokerController.getMessageStore().now() - beginTimeMills)); + response.setBody(r); + } else { + try { + FileRegion fileRegion = + new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult); + channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + getMessageResult.release(); + if (!future.isSuccess()) { + LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause()); + } + } + }); + } catch (Throwable e) { + LOG.error("transfer many message by pagecache exception", e); + getMessageResult.release(); + } + + response = null; + } + break; + case ResponseCode.PULL_NOT_FOUND: + + if (brokerAllowSuspend && hasSuspendFlag) { + long pollingTimeMills = suspendTimeoutMillisLong; + if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { + pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); + } + + String topic = requestHeader.getTopic(); + long offset = requestHeader.getQueueOffset(); + int queueId = requestHeader.getQueueId(); + PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, + this.brokerController.getMessageStore().now(), offset, subscriptionData); + this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); + response = null; + break; + } + + + case ResponseCode.PULL_RETRY_IMMEDIATELY: + break; + case ResponseCode.PULL_OFFSET_MOVED: + if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE + || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { + MessageQueue mq = new MessageQueue(); + mq.setTopic(requestHeader.getTopic()); + mq.setQueueId(requestHeader.getQueueId()); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + + OffsetMovedEvent event = new OffsetMovedEvent(); + event.setConsumerGroup(requestHeader.getConsumerGroup()); + event.setMessageQueue(mq); + event.setOffsetRequest(requestHeader.getQueueOffset()); + event.setOffsetNew(getMessageResult.getNextBeginOffset()); + this.generateOffsetMovedEvent(event); + LOG.warn( + "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), + responseHeader.getSuggestWhichBrokerId()); + } else { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), + responseHeader.getSuggestWhichBrokerId()); + } + + break; + default: + assert false; + } + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("store getMessage return null"); + } + + + boolean storeOffsetEnable = brokerAllowSuspend; + storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; + storeOffsetEnable = storeOffsetEnable + && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; + if (storeOffsetEnable) { + this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); + } + return response; + } + + + public boolean hasConsumeMessageHook() { + return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); + } + + public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) { + if (hasConsumeMessageHook()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageBefore(context); + } catch (Throwable e) { + } + } + } + } + + private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, final int queueId) { + final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize()); + + long storeTimestamp = 0; + try { + List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + + byteBuffer.put(bb); + storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); + } + } finally { + getMessageResult.release(); + } + + this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp); + return byteBuffer.array(); + } + + private void generateOffsetMovedEvent(final OffsetMovedEvent event) { + try { + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT); + msgInner.setTags(event.getConsumerGroup()); + msgInner.setDelayTimeLevel(0); + msgInner.setKeys(event.getConsumerGroup()); + msgInner.setBody(event.encode()); + msgInner.setFlag(0); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags())); + + msgInner.setQueueId(0); + msgInner.setSysFlag(0); + msgInner.setBornTimestamp(System.currentTimeMillis()); + msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr())); + msgInner.setStoreHost(msgInner.getBornHost()); + + msgInner.setReconsumeTimes(0); + + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + } catch (Exception e) { + LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e); + } + } + + public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { + Runnable run = new Runnable() { + @Override + public void run() { + try { + final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false); + + if (response != null) { + response.setOpaque(request.getOpaque()); + response.markResponseType(); + try { + channel.writeAndFlush(response).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed", + future.cause()); + LOG.error(request.toString()); + LOG.error(response.toString()); + } + } + }); + } catch (Throwable e) { + LOG.error("processRequestWrapper process request over, but response failed", e); + LOG.error(request.toString()); + LOG.error(response.toString()); + } + } + } catch (RemotingCommandException e1) { + LOG.error("excuteRequestWhenWakeup run", e1); + } + } + }; + + this.brokerController.getPullMessageExecutor().submit(run); + } + + public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) { + this.consumeMessageHookList = sendMessageHookList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java new file mode 100644 index 0000000..5390e28 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.pagecache.OneMessageTransfer; +import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.QueryMessageResult; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.FileRegion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author shijia.wxr + */ +public class QueryMessageProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + private final BrokerController brokerController; + + + public QueryMessageProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + switch (request.getCode()) { + case RequestCode.QUERY_MESSAGE: + return this.queryMessage(ctx, request); + case RequestCode.VIEW_MESSAGE_BY_ID: + return this.viewMessageById(ctx, request); + default: + break; + } + + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + + + public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = + RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class); + final QueryMessageResponseHeader responseHeader = + (QueryMessageResponseHeader) response.readCustomHeader(); + final QueryMessageRequestHeader requestHeader = + (QueryMessageRequestHeader) request + .decodeCommandCustomHeader(QueryMessageRequestHeader.class); + + + response.setOpaque(request.getOpaque()); + + + String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG); + if (isUniqueKey != null && isUniqueKey.equals("true")) { + requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum()); + } + + final QueryMessageResult queryMessageResult = + this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), + requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), + requestHeader.getEndTimestamp()); + assert queryMessageResult != null; + + responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset()); + responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp()); + + + if (queryMessageResult.getBufferTotalSize() > 0) { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + try { + FileRegion fileRegion = + new QueryMessageTransfer(response.encodeHeader(queryMessageResult + .getBufferTotalSize()), queryMessageResult); + ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + queryMessageResult.release(); + if (!future.isSuccess()) { + log.error("transfer query message by pagecache failed, ", future.cause()); + } + } + }); + } catch (Throwable e) { + log.error("", e); + queryMessageResult.release(); + } + + return null; + } + + response.setCode(ResponseCode.QUERY_NOT_FOUND); + response.setRemark("can not find message, maybe time range not correct"); + return response; + } + + + public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final ViewMessageRequestHeader requestHeader = + (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); + + + response.setOpaque(request.getOpaque()); + + final SelectMappedBufferResult selectMappedBufferResult = + this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); + if (selectMappedBufferResult != null) { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + try { + FileRegion fileRegion = + new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()), + selectMappedBufferResult); + ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + selectMappedBufferResult.release(); + if (!future.isSuccess()) { + log.error("transfer one message by pagecache failed, ", future.cause()); + } + } + }); + } catch (Throwable e) { + log.error("", e); + selectMappedBufferResult.release(); + } + + return null; + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("can not find message by the offset, " + requestHeader.getOffset()); + } + + return response; + } +}