http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java new file mode 100644 index 0000000..ed10f1b --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.processor; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.common.TopicFilterType; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.message.MessageAccessor; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.store.MessageExtBrokerInner; +import com.alibaba.rocketmq.store.MessageStore; +import com.alibaba.rocketmq.store.PutMessageResult; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author shijia.wxr + */ +public class EndTransactionProcessor implements NettyRequestProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); + private final BrokerController brokerController; + + public EndTransactionProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final EndTransactionRequestHeader requestHeader = + (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); + + + if (requestHeader.getFromTransactionCheck()) { + switch (requestHeader.getCommitOrRollback()) { + case MessageSysFlag.TRANSACTION_NOT_TYPE: { + LOGGER.warn("check producer[{}] transaction state, but it's pending status." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + return null; + } + + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { + LOGGER.warn("check producer[{}] transaction state, the producer commit the message." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + + break; + } + + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { + LOGGER.warn("check producer[{}] transaction state, the producer rollback the message." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + break; + } + default: + return null; + } + } else { + switch (requestHeader.getCommitOrRollback()) { + case MessageSysFlag.TRANSACTION_NOT_TYPE: { + LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + return null; + } + + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { + break; + } + + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { + LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message." + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); + break; + } + default: + return null; + } + } + + final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset()); + if (msgExt != null) { + final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); + if (!pgroupRead.equals(requestHeader.getProducerGroup())) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("the producer group wrong"); + return response; + } + + if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("the transaction state table offset wrong"); + return response; + } + + if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("the commit log offset wrong"); + return response; + } + + MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt); + msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); + + msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); + msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); + msgInner.setStoreTimestamp(msgExt.getStoreTimestamp()); + if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { + msgInner.setBody(null); + } + + final MessageStore messageStore = this.brokerController.getMessageStore(); + final PutMessageResult putMessageResult = messageStore.putMessage(msgInner); + if (putMessageResult != null) { + switch (putMessageResult.getPutMessageStatus()) { + // Success + case PUT_OK: + case FLUSH_DISK_TIMEOUT: + case FLUSH_SLAVE_TIMEOUT: + case SLAVE_NOT_AVAILABLE: + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + break; + // Failed + case CREATE_MAPEDFILE_FAILED: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("create maped file failed."); + break; + case MESSAGE_ILLEGAL: + case PROPERTIES_SIZE_EXCEEDED: + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + break; + case SERVICE_NOT_AVAILABLE: + response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); + response.setRemark("service not available now."); + break; + case OS_PAGECACHE_BUSY: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("OS page cache busy, please try another machine"); + break; + case UNKNOWN_ERROR: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR"); + break; + default: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR DEFAULT"); + break; + } + + return response; + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("store putMessage return null"); + } + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("find prepared transaction message failed"); + return response; + } + + return response; + } + + @Override + public boolean rejectRequest() { + return false; + } + + private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) { + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setBody(msgExt.getBody()); + msgInner.setFlag(msgExt.getFlag()); + MessageAccessor.setProperties(msgInner, msgExt.getProperties()); + + TopicFilterType topicFilterType = + (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG + : TopicFilterType.SINGLE_TAG; + long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); + msgInner.setTagsCode(tagsCodeValue); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + + msgInner.setSysFlag(msgExt.getSysFlag()); + msgInner.setBornTimestamp(msgExt.getBornTimestamp()); + msgInner.setBornHost(msgExt.getBornHost()); + msgInner.setStoreHost(msgExt.getStoreHost()); + msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); + + msgInner.setWaitStoreMsgOK(false); + MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + + msgInner.setTopic(msgExt.getTopic()); + msgInner.setQueueId(msgExt.getQueueId()); + + return msgInner; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java new file mode 100644 index 0000000..a92ead0 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.processor; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author shijia.wxr + */ +public class ForwardRequestProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + private final BrokerController brokerController; + + + public ForwardRequestProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java new file mode 100644 index 0000000..0152b93 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java @@ -0,0 +1,540 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.processor; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo; +import com.alibaba.rocketmq.broker.longpolling.PullRequest; +import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext; +import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook; +import com.alibaba.rocketmq.broker.pagecache.ManyMessageTransfer; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.TopicFilterType; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.constant.PermName; +import com.alibaba.rocketmq.common.filter.FilterAPI; +import com.alibaba.rocketmq.common.help.FAQUrl; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent; +import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; +import com.alibaba.rocketmq.common.sysflag.PullSysFlag; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.store.GetMessageResult; +import com.alibaba.rocketmq.store.MessageExtBrokerInner; +import com.alibaba.rocketmq.store.PutMessageResult; +import com.alibaba.rocketmq.store.config.BrokerRole; +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; +import io.netty.channel.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class PullMessageProcessor implements NettyRequestProcessor { + private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + private List<ConsumeMessageHook> consumeMessageHookList; + + public PullMessageProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override + public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + return this.processRequest(ctx.channel(), request, true); + } + + @Override + public boolean rejectRequest() { + return false; + } + + private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) + throws RemotingCommandException { + RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); + final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); + final PullMessageRequestHeader requestHeader = + (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); + + + response.setOpaque(request.getOpaque()); + + if (LOG.isDebugEnabled()) { + LOG.debug("receive PullMessage request command, " + request); + } + + + if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden"); + return response; + } + + + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " " + + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + + + if (!subscriptionGroupConfig.isConsumeEnable()) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); + return response; + } + + final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag()); + final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag()); + final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag()); + + final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0; + + + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + if (null == topicConfig) { + LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel)); + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark( + "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + return response; + } + + + if (!PermName.isReadable(topicConfig.getPerm())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden"); + return response; + } + + + if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { + String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic() + + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress(); + LOG.warn(errorInfo); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorInfo); + return response; + } + + + SubscriptionData subscriptionData = null; + if (hasSubscriptionFlag) { + try { + subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getSubscription()); + } catch (Exception e) { + LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // + requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark("parse the consumer's subscription failed"); + return response; + } + } else { + ConsumerGroupInfo consumerGroupInfo = + this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); + if (null == consumerGroupInfo) { + LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); + response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); + return response; + } + + if (!subscriptionGroupConfig.isConsumeBroadcastEnable() // + && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way"); + return response; + } + + subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); + if (null == subscriptionData) { + LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); + response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); + response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); + return response; + } + + + if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { + LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), + subscriptionData.getSubString()); + response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); + response.setRemark("the consumer's subscription not latest"); + return response; + } + } + + final GetMessageResult getMessageResult = + this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData); + if (getMessageResult != null) { + response.setRemark(getMessageResult.getStatus().name()); + responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); + responseHeader.setMinOffset(getMessageResult.getMinOffset()); + responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); + + + if (getMessageResult.isSuggestPullingFromSlave()) { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); + } else { + responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); + } + + switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { + case ASYNC_MASTER: + case SYNC_MASTER: + break; + case SLAVE: + if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); + } + break; + } + + if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { + // consume too slow ,redirect to another machine + if (getMessageResult.isSuggestPullingFromSlave()) { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); + } + // consume ok + else { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); + } + } else { + responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); + } + + switch (getMessageResult.getStatus()) { + case FOUND: + response.setCode(ResponseCode.SUCCESS); + break; + case MESSAGE_WAS_REMOVING: + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + break; + case NO_MATCHED_LOGIC_QUEUE: + case NO_MESSAGE_IN_QUEUE: + if (0 != requestHeader.getQueueOffset()) { + response.setCode(ResponseCode.PULL_OFFSET_MOVED); + + // XXX: warn and notify me + LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", // + requestHeader.getQueueOffset(), // + getMessageResult.getNextBeginOffset(), // + requestHeader.getTopic(), // + requestHeader.getQueueId(), // + requestHeader.getConsumerGroup()// + ); + } else { + response.setCode(ResponseCode.PULL_NOT_FOUND); + } + break; + case NO_MATCHED_MESSAGE: + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + break; + case OFFSET_FOUND_NULL: + response.setCode(ResponseCode.PULL_NOT_FOUND); + break; + case OFFSET_OVERFLOW_BADLY: + response.setCode(ResponseCode.PULL_OFFSET_MOVED); + // XXX: warn and notify me + LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: " + + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress()); + break; + case OFFSET_OVERFLOW_ONE: + response.setCode(ResponseCode.PULL_NOT_FOUND); + break; + case OFFSET_TOO_SMALL: + response.setCode(ResponseCode.PULL_OFFSET_MOVED); + LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), + getMessageResult.getMinOffset(), channel.remoteAddress()); + break; + default: + assert false; + break; + } + + if (this.hasConsumeMessageHook()) { + ConsumeMessageContext context = new ConsumeMessageContext(); + context.setConsumerGroup(requestHeader.getConsumerGroup()); + context.setTopic(requestHeader.getTopic()); + context.setQueueId(requestHeader.getQueueId()); + + String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); + + switch (response.getCode()) { + case ResponseCode.SUCCESS: + + context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS); + context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial()); + context.setCommercialRcvSize(getMessageResult.getBufferTotalSize()); + context.setCommercialOwner(owner); + + break; + case ResponseCode.PULL_NOT_FOUND: + if (!brokerAllowSuspend) { + + + context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); + context.setCommercialRcvTimes(1); + context.setCommercialOwner(owner); + + } + break; + case ResponseCode.PULL_RETRY_IMMEDIATELY: + case ResponseCode.PULL_OFFSET_MOVED: + context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); + context.setCommercialRcvTimes(1); + context.setCommercialOwner(owner); + break; + default: + assert false; + break; + } + + this.executeConsumeMessageHookBefore(context); + } + + switch (response.getCode()) { + case ResponseCode.SUCCESS: + + this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + getMessageResult.getMessageCount()); + + this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + getMessageResult.getBufferTotalSize()); + + this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); + if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { + final long beginTimeMills = this.brokerController.getMessageStore().now(); + final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); + this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), + requestHeader.getTopic(), requestHeader.getQueueId(), + (int) (this.brokerController.getMessageStore().now() - beginTimeMills)); + response.setBody(r); + } else { + try { + FileRegion fileRegion = + new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult); + channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + getMessageResult.release(); + if (!future.isSuccess()) { + LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause()); + } + } + }); + } catch (Throwable e) { + LOG.error("transfer many message by pagecache exception", e); + getMessageResult.release(); + } + + response = null; + } + break; + case ResponseCode.PULL_NOT_FOUND: + + if (brokerAllowSuspend && hasSuspendFlag) { + long pollingTimeMills = suspendTimeoutMillisLong; + if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { + pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); + } + + String topic = requestHeader.getTopic(); + long offset = requestHeader.getQueueOffset(); + int queueId = requestHeader.getQueueId(); + PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, + this.brokerController.getMessageStore().now(), offset, subscriptionData); + this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); + response = null; + break; + } + + + case ResponseCode.PULL_RETRY_IMMEDIATELY: + break; + case ResponseCode.PULL_OFFSET_MOVED: + if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE + || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { + MessageQueue mq = new MessageQueue(); + mq.setTopic(requestHeader.getTopic()); + mq.setQueueId(requestHeader.getQueueId()); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + + OffsetMovedEvent event = new OffsetMovedEvent(); + event.setConsumerGroup(requestHeader.getConsumerGroup()); + event.setMessageQueue(mq); + event.setOffsetRequest(requestHeader.getQueueOffset()); + event.setOffsetNew(getMessageResult.getNextBeginOffset()); + this.generateOffsetMovedEvent(event); + LOG.warn( + "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), + responseHeader.getSuggestWhichBrokerId()); + } else { + responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); + response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); + LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), + responseHeader.getSuggestWhichBrokerId()); + } + + break; + default: + assert false; + } + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("store getMessage return null"); + } + + + boolean storeOffsetEnable = brokerAllowSuspend; + storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; + storeOffsetEnable = storeOffsetEnable + && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; + if (storeOffsetEnable) { + this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); + } + return response; + } + + + public boolean hasConsumeMessageHook() { + return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); + } + + public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) { + if (hasConsumeMessageHook()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageBefore(context); + } catch (Throwable e) { + } + } + } + } + + private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, final int queueId) { + final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize()); + + long storeTimestamp = 0; + try { + List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + + byteBuffer.put(bb); + storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); + } + } finally { + getMessageResult.release(); + } + + this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp); + return byteBuffer.array(); + } + + private void generateOffsetMovedEvent(final OffsetMovedEvent event) { + try { + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT); + msgInner.setTags(event.getConsumerGroup()); + msgInner.setDelayTimeLevel(0); + msgInner.setKeys(event.getConsumerGroup()); + msgInner.setBody(event.encode()); + msgInner.setFlag(0); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags())); + + msgInner.setQueueId(0); + msgInner.setSysFlag(0); + msgInner.setBornTimestamp(System.currentTimeMillis()); + msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr())); + msgInner.setStoreHost(msgInner.getBornHost()); + + msgInner.setReconsumeTimes(0); + + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + } catch (Exception e) { + LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e); + } + } + + public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { + Runnable run = new Runnable() { + @Override + public void run() { + try { + final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false); + + if (response != null) { + response.setOpaque(request.getOpaque()); + response.markResponseType(); + try { + channel.writeAndFlush(response).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed", + future.cause()); + LOG.error(request.toString()); + LOG.error(response.toString()); + } + } + }); + } catch (Throwable e) { + LOG.error("processRequestWrapper process request over, but response failed", e); + LOG.error(request.toString()); + LOG.error(response.toString()); + } + } + } catch (RemotingCommandException e1) { + LOG.error("excuteRequestWhenWakeup run", e1); + } + } + }; + + this.brokerController.getPullMessageExecutor().submit(run); + } + + public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) { + this.consumeMessageHookList = sendMessageHookList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java new file mode 100644 index 0000000..738d11f --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.processor; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer; +import com.alibaba.rocketmq.broker.pagecache.QueryMessageTransfer; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.protocol.RequestCode; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.QueryMessageResponseHeader; +import com.alibaba.rocketmq.common.protocol.header.ViewMessageRequestHeader; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.store.QueryMessageResult; +import com.alibaba.rocketmq.store.SelectMappedBufferResult; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.FileRegion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author shijia.wxr + */ +public class QueryMessageProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + private final BrokerController brokerController; + + + public QueryMessageProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + switch (request.getCode()) { + case RequestCode.QUERY_MESSAGE: + return this.queryMessage(ctx, request); + case RequestCode.VIEW_MESSAGE_BY_ID: + return this.viewMessageById(ctx, request); + default: + break; + } + + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + + + public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = + RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class); + final QueryMessageResponseHeader responseHeader = + (QueryMessageResponseHeader) response.readCustomHeader(); + final QueryMessageRequestHeader requestHeader = + (QueryMessageRequestHeader) request + .decodeCommandCustomHeader(QueryMessageRequestHeader.class); + + + response.setOpaque(request.getOpaque()); + + + String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG); + if (isUniqueKey != null && isUniqueKey.equals("true")) { + requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum()); + } + + final QueryMessageResult queryMessageResult = + this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), + requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), + requestHeader.getEndTimestamp()); + assert queryMessageResult != null; + + responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset()); + responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp()); + + + if (queryMessageResult.getBufferTotalSize() > 0) { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + try { + FileRegion fileRegion = + new QueryMessageTransfer(response.encodeHeader(queryMessageResult + .getBufferTotalSize()), queryMessageResult); + ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + queryMessageResult.release(); + if (!future.isSuccess()) { + log.error("transfer query message by pagecache failed, ", future.cause()); + } + } + }); + } catch (Throwable e) { + log.error("", e); + queryMessageResult.release(); + } + + return null; + } + + response.setCode(ResponseCode.QUERY_NOT_FOUND); + response.setRemark("can not find message, maybe time range not correct"); + return response; + } + + + public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final ViewMessageRequestHeader requestHeader = + (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); + + + response.setOpaque(request.getOpaque()); + + final SelectMappedBufferResult selectMappedBufferResult = + this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); + if (selectMappedBufferResult != null) { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + try { + FileRegion fileRegion = + new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()), + selectMappedBufferResult); + ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + selectMappedBufferResult.release(); + if (!future.isSuccess()) { + log.error("transfer one message by pagecache failed, ", future.cause()); + } + } + }); + } catch (Throwable e) { + log.error("", e); + selectMappedBufferResult.release(); + } + + return null; + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("can not find message by the offset, " + requestHeader.getOffset()); + } + + return response; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java new file mode 100644 index 0000000..414b3f4 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java @@ -0,0 +1,496 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.processor; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext; +import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook; +import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext; +import com.alibaba.rocketmq.common.*; +import com.alibaba.rocketmq.common.constant.PermName; +import com.alibaba.rocketmq.common.help.FAQUrl; +import com.alibaba.rocketmq.common.message.MessageAccessor; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.RequestCode; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader; +import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; +import com.alibaba.rocketmq.common.sysflag.TopicSysFlag; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.store.MessageExtBrokerInner; +import com.alibaba.rocketmq.store.PutMessageResult; +import com.alibaba.rocketmq.store.config.StorePathConfigHelper; +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; +import io.netty.channel.ChannelHandlerContext; + +import java.net.SocketAddress; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { + + private List<ConsumeMessageHook> consumeMessageHookList; + + public SendMessageProcessor(final BrokerController brokerController) { + super(brokerController); + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + SendMessageContext mqtraceContext; + switch (request.getCode()) { + case RequestCode.CONSUMER_SEND_MSG_BACK: + return this.consumerSendMsgBack(ctx, request); + default: + SendMessageRequestHeader requestHeader = parseRequestHeader(request); + if (requestHeader == null) { + return null; + } + + mqtraceContext = buildMsgContext(ctx, requestHeader); + this.executeSendMessageHookBefore(ctx, request, mqtraceContext); + final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + + this.executeSendMessageHookAfter(response, mqtraceContext); + return response; + } + } + + @Override + public boolean rejectRequest() { + return this.brokerController.getMessageStore().isOSPageCacheBusy() || + this.brokerController.getMessageStore().isTransientStorePoolDeficient(); + } + + private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final ConsumerSendMsgBackRequestHeader requestHeader = + (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); + + if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { + + ConsumeMessageContext context = new ConsumeMessageContext(); + context.setConsumerGroup(requestHeader.getGroup()); + context.setTopic(requestHeader.getOriginTopic()); + context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); + context.setCommercialRcvTimes(1); + context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER)); + + this.executeConsumeMessageHookAfter(context); + } + + + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + + + if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); + return response; + } + + + if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); + int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); + + + int topicSysFlag = 0; + if (requestHeader.isUnitMode()) { + topicSysFlag = TopicSysFlag.buildSysFlag(false, true); + } + + + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(// + newTopic, // + subscriptionGroupConfig.getRetryQueueNums(), // + PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + } + + + if (!PermName.isWriteable(topicConfig.getPerm())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); + return response; + } + + MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); + if (null == msgExt) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("look message by offset failed, " + requestHeader.getOffset()); + return response; + } + + + final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (null == retryTopic) { + MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); + } + msgExt.setWaitStoreMsgOK(false); + + + int delayLevel = requestHeader.getDelayLevel(); + + + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + } + + + if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// + || delayLevel < 0) { + newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 + ); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + } + } else { + if (0 == delayLevel) { + delayLevel = 3 + msgExt.getReconsumeTimes(); + } + + msgExt.setDelayTimeLevel(delayLevel); + } + + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(newTopic); + msgInner.setBody(msgExt.getBody()); + msgInner.setFlag(msgExt.getFlag()); + MessageAccessor.setProperties(msgInner, msgExt.getProperties()); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); + + msgInner.setQueueId(queueIdInt); + msgInner.setSysFlag(msgExt.getSysFlag()); + msgInner.setBornTimestamp(msgExt.getBornTimestamp()); + msgInner.setBornHost(msgExt.getBornHost()); + msgInner.setStoreHost(this.getStoreHost()); + msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); + + String originMsgId = MessageAccessor.getOriginMessageId(msgExt); + MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); + + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + if (putMessageResult != null) { + switch (putMessageResult.getPutMessageStatus()) { + case PUT_OK: + String backTopic = msgExt.getTopic(); + String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (correctTopic != null) { + backTopic = correctTopic; + } + + this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + return response; + default: + break; + } + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(putMessageResult.getPutMessageStatus().name()); + return response; + } + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("putMessageResult is null"); + return response; + } + + private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + if (log.isDebugEnabled()) { + log.debug("receive SendMessage request command, " + request); + } + + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimstamp) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + final byte[] body = request.getBody(); + + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + + int sysFlag = requestHeader.getSysFlag(); + + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + + String newTopic = requestHeader.getTopic(); + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark( + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + + + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + } + int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); + if (reconsumeTimes >= maxReconsumeTimes) { + newTopic = MixAll.getDLQTopic(groupName); + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 + ); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + } + } + } + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(newTopic); + msgInner.setBody(body); + msgInner.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); + msgInner.setPropertiesString(requestHeader.getProperties()); + msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags())); + + msgInner.setQueueId(queueIdInt); + msgInner.setSysFlag(sysFlag); + msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); + msgInner.setBornHost(ctx.channel().remoteAddress()); + msgInner.setStoreHost(this.getStoreHost()); + msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + + if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { + String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); + if (traFlag != null) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark( + "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); + return response; + } + } + + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + if (putMessageResult != null) { + boolean sendOK = false; + + switch (putMessageResult.getPutMessageStatus()) { + // Success + case PUT_OK: + sendOK = true; + response.setCode(ResponseCode.SUCCESS); + break; + case FLUSH_DISK_TIMEOUT: + response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); + sendOK = true; + break; + case FLUSH_SLAVE_TIMEOUT: + response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); + sendOK = true; + break; + case SLAVE_NOT_AVAILABLE: + response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); + sendOK = true; + break; + + // Failed + case CREATE_MAPEDFILE_FAILED: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("create mapped file failed, server is busy or broken."); + break; + case MESSAGE_ILLEGAL: + case PROPERTIES_SIZE_EXCEEDED: + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark( + "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + break; + case SERVICE_NOT_AVAILABLE: + response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); + response.setRemark( + "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); + break; + case OS_PAGECACHE_BUSY: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); + break; + case UNKNOWN_ERROR: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR"); + break; + default: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR DEFAULT"); + break; + } + + String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); + if (sendOK) { + + this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); + this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + this.brokerController.getBrokerStatsManager().incBrokerPutNums(); + + response.setRemark(null); + + responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); + responseHeader.setQueueId(queueIdInt); + responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); + + + doResponse(ctx, request, response); + + + if (hasSendMessageHook()) { + sendMessageContext.setMsgId(responseHeader.getMsgId()); + sendMessageContext.setQueueId(responseHeader.getQueueId()); + sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); + + int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); + } + return null; + } else { + if (hasSendMessageHook()) { + int wroteSize = request.getBody().length; + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); + } + } + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("store putMessage return null"); + } + + return response; + } + + public boolean hasConsumeMessageHook() { + return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); + } + + public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) { + if (hasConsumeMessageHook()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageAfter(context); + } catch (Throwable e) { + } + } + } + } + + public SocketAddress getStoreHost() { + return storeHost; + } + + private String diskUtil() { + String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + + String storePathLogis = + StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis); + + String storePathIndex = + StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex); + + return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio); + } + + public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) { + this.consumeMessageHookList = consumeMessageHookList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java new file mode 100644 index 0000000..8b3aefe --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.slave; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper; +import com.alibaba.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import com.alibaba.rocketmq.store.config.StorePathConfigHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + +/** + * @author shijia.wxr + * @author manhong.yqd + */ +public class SlaveSynchronize { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + private volatile String masterAddr = null; + + + public SlaveSynchronize(BrokerController brokerController) { + this.brokerController = brokerController; + } + + + public String getMasterAddr() { + return masterAddr; + } + + + public void setMasterAddr(String masterAddr) { + this.masterAddr = masterAddr; + } + + + public void syncAll() { + this.syncTopicConfig(); + this.syncConsumerOffset(); + this.syncDelayOffset(); + this.syncSubscriptionGroupConfig(); + } + + + private void syncTopicConfig() { + String masterAddrBak = this.masterAddr; + if (masterAddrBak != null) { + try { + TopicConfigSerializeWrapper topicWrapper = + this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); + if (!this.brokerController.getTopicConfigManager().getDataVersion() + .equals(topicWrapper.getDataVersion())) { + + this.brokerController.getTopicConfigManager().getDataVersion() + .assignNewOne(topicWrapper.getDataVersion()); + this.brokerController.getTopicConfigManager().getTopicConfigTable().clear(); + this.brokerController.getTopicConfigManager().getTopicConfigTable() + .putAll(topicWrapper.getTopicConfigTable()); + this.brokerController.getTopicConfigManager().persist(); + + log.info("update slave topic config from master, {}", masterAddrBak); + } + } catch (Exception e) { + log.error("syncTopicConfig Exception, " + masterAddrBak, e); + } + } + } + + + private void syncConsumerOffset() { + String masterAddrBak = this.masterAddr; + if (masterAddrBak != null) { + try { + ConsumerOffsetSerializeWrapper offsetWrapper = + this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); + this.brokerController.getConsumerOffsetManager().getOffsetTable() + .putAll(offsetWrapper.getOffsetTable()); + this.brokerController.getConsumerOffsetManager().persist(); + log.info("update slave consumer offset from master, {}", masterAddrBak); + } catch (Exception e) { + log.error("syncConsumerOffset Exception, " + masterAddrBak, e); + } + } + } + + + private void syncDelayOffset() { + String masterAddrBak = this.masterAddr; + if (masterAddrBak != null) { + try { + String delayOffset = + this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); + if (delayOffset != null) { + + String fileName = + StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController + .getMessageStoreConfig().getStorePathRootDir()); + try { + MixAll.string2File(delayOffset, fileName); + } catch (IOException e) { + log.error("persist file Exception, " + fileName, e); + } + } + log.info("update slave delay offset from master, {}", masterAddrBak); + } catch (Exception e) { + log.error("syncDelayOffset Exception, " + masterAddrBak, e); + } + } + } + + + private void syncSubscriptionGroupConfig() { + String masterAddrBak = this.masterAddr; + if (masterAddrBak != null) { + try { + SubscriptionGroupWrapper subscriptionWrapper = + this.brokerController.getBrokerOuterAPI() + .getAllSubscriptionGroupConfig(masterAddrBak); + + if (!this.brokerController.getSubscriptionGroupManager().getDataVersion() + .equals(subscriptionWrapper.getDataVersion())) { + SubscriptionGroupManager subscriptionGroupManager = + this.brokerController.getSubscriptionGroupManager(); + subscriptionGroupManager.getDataVersion().assignNewOne( + subscriptionWrapper.getDataVersion()); + subscriptionGroupManager.getSubscriptionGroupTable().clear(); + subscriptionGroupManager.getSubscriptionGroupTable().putAll( + subscriptionWrapper.getSubscriptionGroupTable()); + subscriptionGroupManager.persist(); + log.info("update slave Subscription Group from master, {}", masterAddrBak); + } + } catch (Exception e) { + log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e); + } + } + } +}
