http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java deleted file mode 100644 index ed10f1b..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.processor; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.common.TopicFilterType; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.message.MessageAccessor; -import com.alibaba.rocketmq.common.message.MessageConst; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader; -import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.store.MessageExtBrokerInner; -import com.alibaba.rocketmq.store.MessageStore; -import com.alibaba.rocketmq.store.PutMessageResult; -import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * @author shijia.wxr - */ -public class EndTransactionProcessor implements NettyRequestProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); - private final BrokerController brokerController; - - public EndTransactionProcessor(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final EndTransactionRequestHeader requestHeader = - (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); - - - if (requestHeader.getFromTransactionCheck()) { - switch (requestHeader.getCommitOrRollback()) { - case MessageSysFlag.TRANSACTION_NOT_TYPE: { - LOGGER.warn("check producer[{}] transaction state, but it's pending status." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); - return null; - } - - case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { - LOGGER.warn("check producer[{}] transaction state, the producer commit the message." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); - - break; - } - - case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { - LOGGER.warn("check producer[{}] transaction state, the producer rollback the message." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); - break; - } - default: - return null; - } - } else { - switch (requestHeader.getCommitOrRollback()) { - case MessageSysFlag.TRANSACTION_NOT_TYPE: { - LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); - return null; - } - - case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { - break; - } - - case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { - LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); - break; - } - default: - return null; - } - } - - final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset()); - if (msgExt != null) { - final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); - if (!pgroupRead.equals(requestHeader.getProducerGroup())) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("the producer group wrong"); - return response; - } - - if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("the transaction state table offset wrong"); - return response; - } - - if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("the commit log offset wrong"); - return response; - } - - MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt); - msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); - - msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); - msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); - msgInner.setStoreTimestamp(msgExt.getStoreTimestamp()); - if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { - msgInner.setBody(null); - } - - final MessageStore messageStore = this.brokerController.getMessageStore(); - final PutMessageResult putMessageResult = messageStore.putMessage(msgInner); - if (putMessageResult != null) { - switch (putMessageResult.getPutMessageStatus()) { - // Success - case PUT_OK: - case FLUSH_DISK_TIMEOUT: - case FLUSH_SLAVE_TIMEOUT: - case SLAVE_NOT_AVAILABLE: - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - break; - // Failed - case CREATE_MAPEDFILE_FAILED: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("create maped file failed."); - break; - case MESSAGE_ILLEGAL: - case PROPERTIES_SIZE_EXCEEDED: - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); - break; - case SERVICE_NOT_AVAILABLE: - response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); - response.setRemark("service not available now."); - break; - case OS_PAGECACHE_BUSY: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("OS page cache busy, please try another machine"); - break; - case UNKNOWN_ERROR: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR"); - break; - default: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR DEFAULT"); - break; - } - - return response; - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("store putMessage return null"); - } - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("find prepared transaction message failed"); - return response; - } - - return response; - } - - @Override - public boolean rejectRequest() { - return false; - } - - private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) { - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setBody(msgExt.getBody()); - msgInner.setFlag(msgExt.getFlag()); - MessageAccessor.setProperties(msgInner, msgExt.getProperties()); - - TopicFilterType topicFilterType = - (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG - : TopicFilterType.SINGLE_TAG; - long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); - msgInner.setTagsCode(tagsCodeValue); - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); - - msgInner.setSysFlag(msgExt.getSysFlag()); - msgInner.setBornTimestamp(msgExt.getBornTimestamp()); - msgInner.setBornHost(msgExt.getBornHost()); - msgInner.setStoreHost(msgExt.getStoreHost()); - msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); - - msgInner.setWaitStoreMsgOK(false); - MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); - - msgInner.setTopic(msgExt.getTopic()); - msgInner.setQueueId(msgExt.getQueueId()); - - return msgInner; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java deleted file mode 100644 index a92ead0..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.processor; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * @author shijia.wxr - */ -public class ForwardRequestProcessor implements NettyRequestProcessor { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - - private final BrokerController brokerController; - - - public ForwardRequestProcessor(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { - return null; - } - - @Override - public boolean rejectRequest() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java deleted file mode 100644 index 1257f18..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java +++ /dev/null @@ -1,542 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.processor; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo; -import com.alibaba.rocketmq.broker.longpolling.PullRequest; -import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext; -import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook; -import com.alibaba.rocketmq.broker.pagecache.ManyMessageTransfer; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.TopicConfig; -import com.alibaba.rocketmq.common.TopicFilterType; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.constant.PermName; -import com.alibaba.rocketmq.common.filter.FilterAPI; -import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent; -import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; -import com.alibaba.rocketmq.common.sysflag.PullSysFlag; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.common.RemotingUtil; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.store.GetMessageResult; -import com.alibaba.rocketmq.store.MessageExtBrokerInner; -import com.alibaba.rocketmq.store.PutMessageResult; -import com.alibaba.rocketmq.store.config.BrokerRole; -import com.alibaba.rocketmq.store.stats.BrokerStatsManager; -import io.netty.channel.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class PullMessageProcessor implements NettyRequestProcessor { - private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final BrokerController brokerController; - private List<ConsumeMessageHook> consumeMessageHookList; - - public PullMessageProcessor(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - @Override - public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - return this.processRequest(ctx.channel(), request, true); - } - - @Override - public boolean rejectRequest() { - return false; - } - - private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) - throws RemotingCommandException { - RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); - final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); - final PullMessageRequestHeader requestHeader = - (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); - - - response.setOpaque(request.getOpaque()); - - if (LOG.isDebugEnabled()) { - LOG.debug("receive PullMessage request command, " + request); - } - - - if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { - response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden"); - return response; - } - - - SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); - if (null == subscriptionGroupConfig) { - response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); - response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " " - + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); - return response; - } - - - if (!subscriptionGroupConfig.isConsumeEnable()) { - response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); - return response; - } - - final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag()); - final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag()); - final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag()); - - final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0; - - - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); - if (null == topicConfig) { - LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel)); - response.setCode(ResponseCode.TOPIC_NOT_EXIST); - response.setRemark( - "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); - return response; - } - - - if (!PermName.isReadable(topicConfig.getPerm())) { - response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden"); - return response; - } - - - if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { - String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic() - + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress(); - LOG.warn(errorInfo); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(errorInfo); - return response; - } - - - SubscriptionData subscriptionData = null; - if (hasSubscriptionFlag) { - try { - subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - requestHeader.getSubscription()); - } catch (Exception e) { - LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // - requestHeader.getConsumerGroup()); - response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); - response.setRemark("parse the consumer's subscription failed"); - return response; - } - } else { - ConsumerGroupInfo consumerGroupInfo = - this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); - if (null == consumerGroupInfo) { - LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); - response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); - response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); - return response; - } - - if (!subscriptionGroupConfig.isConsumeBroadcastEnable() // - && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) { - response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way"); - return response; - } - - subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); - if (null == subscriptionData) { - LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); - response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); - response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); - return response; - } - - - if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { - LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), - subscriptionData.getSubString()); - response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); - response.setRemark("the consumer's subscription not latest"); - return response; - } - } - - final GetMessageResult getMessageResult = - this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData); - if (getMessageResult != null) { - response.setRemark(getMessageResult.getStatus().name()); - responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); - responseHeader.setMinOffset(getMessageResult.getMinOffset()); - responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); - - - if (getMessageResult.isSuggestPullingFromSlave()) { - responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); - } else { - responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); - } - - switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { - case ASYNC_MASTER: - case SYNC_MASTER: - break; - case SLAVE: - if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); - } - break; - } - - if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { - // consume too slow ,redirect to another machine - if (getMessageResult.isSuggestPullingFromSlave()) { - responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); - } - // consume ok - else { - responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); - } - } else { - responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); - } - - switch (getMessageResult.getStatus()) { - case FOUND: - response.setCode(ResponseCode.SUCCESS); - break; - case MESSAGE_WAS_REMOVING: - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - break; - case NO_MATCHED_LOGIC_QUEUE: - case NO_MESSAGE_IN_QUEUE: - if (0 != requestHeader.getQueueOffset()) { - response.setCode(ResponseCode.PULL_OFFSET_MOVED); - - // XXX: warn and notify me - LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", // - requestHeader.getQueueOffset(), // - getMessageResult.getNextBeginOffset(), // - requestHeader.getTopic(), // - requestHeader.getQueueId(), // - requestHeader.getConsumerGroup()// - ); - } else { - response.setCode(ResponseCode.PULL_NOT_FOUND); - } - break; - case NO_MATCHED_MESSAGE: - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - break; - case OFFSET_FOUND_NULL: - response.setCode(ResponseCode.PULL_NOT_FOUND); - break; - case OFFSET_OVERFLOW_BADLY: - response.setCode(ResponseCode.PULL_OFFSET_MOVED); - // XXX: warn and notify me - LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: " - + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress()); - break; - case OFFSET_OVERFLOW_ONE: - response.setCode(ResponseCode.PULL_NOT_FOUND); - break; - case OFFSET_TOO_SMALL: - response.setCode(ResponseCode.PULL_OFFSET_MOVED); - LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", - requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), - getMessageResult.getMinOffset(), channel.remoteAddress()); - break; - default: - assert false; - break; - } - - if (this.hasConsumeMessageHook()) { - ConsumeMessageContext context = new ConsumeMessageContext(); - context.setConsumerGroup(requestHeader.getConsumerGroup()); - context.setTopic(requestHeader.getTopic()); - context.setQueueId(requestHeader.getQueueId()); - - String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); - - switch (response.getCode()) { - case ResponseCode.SUCCESS: - int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); - int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount; - - context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS); - context.setCommercialRcvTimes(incValue); - context.setCommercialRcvSize(getMessageResult.getBufferTotalSize()); - context.setCommercialOwner(owner); - - break; - case ResponseCode.PULL_NOT_FOUND: - if (!brokerAllowSuspend) { - - - context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); - context.setCommercialRcvTimes(1); - context.setCommercialOwner(owner); - - } - break; - case ResponseCode.PULL_RETRY_IMMEDIATELY: - case ResponseCode.PULL_OFFSET_MOVED: - context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); - context.setCommercialRcvTimes(1); - context.setCommercialOwner(owner); - break; - default: - assert false; - break; - } - - this.executeConsumeMessageHookBefore(context); - } - - switch (response.getCode()) { - case ResponseCode.SUCCESS: - - this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - getMessageResult.getMessageCount()); - - this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - getMessageResult.getBufferTotalSize()); - - this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); - if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { - final long beginTimeMills = this.brokerController.getMessageStore().now(); - final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); - this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), requestHeader.getQueueId(), - (int) (this.brokerController.getMessageStore().now() - beginTimeMills)); - response.setBody(r); - } else { - try { - FileRegion fileRegion = - new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult); - channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - getMessageResult.release(); - if (!future.isSuccess()) { - LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause()); - } - } - }); - } catch (Throwable e) { - LOG.error("transfer many message by pagecache exception", e); - getMessageResult.release(); - } - - response = null; - } - break; - case ResponseCode.PULL_NOT_FOUND: - - if (brokerAllowSuspend && hasSuspendFlag) { - long pollingTimeMills = suspendTimeoutMillisLong; - if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { - pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); - } - - String topic = requestHeader.getTopic(); - long offset = requestHeader.getQueueOffset(); - int queueId = requestHeader.getQueueId(); - PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, - this.brokerController.getMessageStore().now(), offset, subscriptionData); - this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); - response = null; - break; - } - - - case ResponseCode.PULL_RETRY_IMMEDIATELY: - break; - case ResponseCode.PULL_OFFSET_MOVED: - if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE - || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { - MessageQueue mq = new MessageQueue(); - mq.setTopic(requestHeader.getTopic()); - mq.setQueueId(requestHeader.getQueueId()); - mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - - OffsetMovedEvent event = new OffsetMovedEvent(); - event.setConsumerGroup(requestHeader.getConsumerGroup()); - event.setMessageQueue(mq); - event.setOffsetRequest(requestHeader.getQueueOffset()); - event.setOffsetNew(getMessageResult.getNextBeginOffset()); - this.generateOffsetMovedEvent(event); - LOG.warn( - "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), - responseHeader.getSuggestWhichBrokerId()); - } else { - responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); - response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); - LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), - responseHeader.getSuggestWhichBrokerId()); - } - - break; - default: - assert false; - } - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("store getMessage return null"); - } - - - boolean storeOffsetEnable = brokerAllowSuspend; - storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; - storeOffsetEnable = storeOffsetEnable - && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; - if (storeOffsetEnable) { - this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), - requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); - } - return response; - } - - - public boolean hasConsumeMessageHook() { - return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); - } - - public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) { - if (hasConsumeMessageHook()) { - for (ConsumeMessageHook hook : this.consumeMessageHookList) { - try { - hook.consumeMessageBefore(context); - } catch (Throwable e) { - } - } - } - } - - private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, final int queueId) { - final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize()); - - long storeTimestamp = 0; - try { - List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList(); - for (ByteBuffer bb : messageBufferList) { - - byteBuffer.put(bb); - storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); - } - } finally { - getMessageResult.release(); - } - - this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp); - return byteBuffer.array(); - } - - private void generateOffsetMovedEvent(final OffsetMovedEvent event) { - try { - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT); - msgInner.setTags(event.getConsumerGroup()); - msgInner.setDelayTimeLevel(0); - msgInner.setKeys(event.getConsumerGroup()); - msgInner.setBody(event.encode()); - msgInner.setFlag(0); - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags())); - - msgInner.setQueueId(0); - msgInner.setSysFlag(0); - msgInner.setBornTimestamp(System.currentTimeMillis()); - msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr())); - msgInner.setStoreHost(msgInner.getBornHost()); - - msgInner.setReconsumeTimes(0); - - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - } catch (Exception e) { - LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e); - } - } - - public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { - Runnable run = new Runnable() { - @Override - public void run() { - try { - final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false); - - if (response != null) { - response.setOpaque(request.getOpaque()); - response.markResponseType(); - try { - channel.writeAndFlush(response).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed", - future.cause()); - LOG.error(request.toString()); - LOG.error(response.toString()); - } - } - }); - } catch (Throwable e) { - LOG.error("processRequestWrapper process request over, but response failed", e); - LOG.error(request.toString()); - LOG.error(response.toString()); - } - } - } catch (RemotingCommandException e1) { - LOG.error("excuteRequestWhenWakeup run", e1); - } - } - }; - - this.brokerController.getPullMessageExecutor().submit(run); - } - - public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) { - this.consumeMessageHookList = sendMessageHookList; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java deleted file mode 100644 index 738d11f..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.processor; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer; -import com.alibaba.rocketmq.broker.pagecache.QueryMessageTransfer; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryMessageResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.ViewMessageRequestHeader; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.store.QueryMessageResult; -import com.alibaba.rocketmq.store.SelectMappedBufferResult; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.FileRegion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * @author shijia.wxr - */ -public class QueryMessageProcessor implements NettyRequestProcessor { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - - private final BrokerController brokerController; - - - public QueryMessageProcessor(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - switch (request.getCode()) { - case RequestCode.QUERY_MESSAGE: - return this.queryMessage(ctx, request); - case RequestCode.VIEW_MESSAGE_BY_ID: - return this.viewMessageById(ctx, request); - default: - break; - } - - return null; - } - - @Override - public boolean rejectRequest() { - return false; - } - - - public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = - RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class); - final QueryMessageResponseHeader responseHeader = - (QueryMessageResponseHeader) response.readCustomHeader(); - final QueryMessageRequestHeader requestHeader = - (QueryMessageRequestHeader) request - .decodeCommandCustomHeader(QueryMessageRequestHeader.class); - - - response.setOpaque(request.getOpaque()); - - - String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG); - if (isUniqueKey != null && isUniqueKey.equals("true")) { - requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum()); - } - - final QueryMessageResult queryMessageResult = - this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), - requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), - requestHeader.getEndTimestamp()); - assert queryMessageResult != null; - - responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset()); - responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp()); - - - if (queryMessageResult.getBufferTotalSize() > 0) { - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - - try { - FileRegion fileRegion = - new QueryMessageTransfer(response.encodeHeader(queryMessageResult - .getBufferTotalSize()), queryMessageResult); - ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - queryMessageResult.release(); - if (!future.isSuccess()) { - log.error("transfer query message by pagecache failed, ", future.cause()); - } - } - }); - } catch (Throwable e) { - log.error("", e); - queryMessageResult.release(); - } - - return null; - } - - response.setCode(ResponseCode.QUERY_NOT_FOUND); - response.setRemark("can not find message, maybe time range not correct"); - return response; - } - - - public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final ViewMessageRequestHeader requestHeader = - (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); - - - response.setOpaque(request.getOpaque()); - - final SelectMappedBufferResult selectMappedBufferResult = - this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); - if (selectMappedBufferResult != null) { - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - - try { - FileRegion fileRegion = - new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()), - selectMappedBufferResult); - ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - selectMappedBufferResult.release(); - if (!future.isSuccess()) { - log.error("transfer one message by pagecache failed, ", future.cause()); - } - } - }); - } catch (Throwable e) { - log.error("", e); - selectMappedBufferResult.release(); - } - - return null; - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("can not find message by the offset, " + requestHeader.getOffset()); - } - - return response; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java deleted file mode 100644 index a375285..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java +++ /dev/null @@ -1,497 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.processor; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext; -import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook; -import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext; -import com.alibaba.rocketmq.common.*; -import com.alibaba.rocketmq.common.constant.PermName; -import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.MessageAccessor; -import com.alibaba.rocketmq.common.message.MessageConst; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader; -import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; -import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; -import com.alibaba.rocketmq.common.sysflag.TopicSysFlag; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.store.MessageExtBrokerInner; -import com.alibaba.rocketmq.store.PutMessageResult; -import com.alibaba.rocketmq.store.config.StorePathConfigHelper; -import com.alibaba.rocketmq.store.stats.BrokerStatsManager; -import io.netty.channel.ChannelHandlerContext; - -import java.net.SocketAddress; -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { - - private List<ConsumeMessageHook> consumeMessageHookList; - - public SendMessageProcessor(final BrokerController brokerController) { - super(brokerController); - } - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - SendMessageContext mqtraceContext; - switch (request.getCode()) { - case RequestCode.CONSUMER_SEND_MSG_BACK: - return this.consumerSendMsgBack(ctx, request); - default: - SendMessageRequestHeader requestHeader = parseRequestHeader(request); - if (requestHeader == null) { - return null; - } - - mqtraceContext = buildMsgContext(ctx, requestHeader); - this.executeSendMessageHookBefore(ctx, request, mqtraceContext); - final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); - - this.executeSendMessageHookAfter(response, mqtraceContext); - return response; - } - } - - @Override - public boolean rejectRequest() { - return this.brokerController.getMessageStore().isOSPageCacheBusy() || - this.brokerController.getMessageStore().isTransientStorePoolDeficient(); - } - - private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final ConsumerSendMsgBackRequestHeader requestHeader = - (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); - - if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { - - ConsumeMessageContext context = new ConsumeMessageContext(); - context.setConsumerGroup(requestHeader.getGroup()); - context.setTopic(requestHeader.getOriginTopic()); - context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); - context.setCommercialRcvTimes(1); - context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER)); - - this.executeConsumeMessageHookAfter(context); - } - - - SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); - if (null == subscriptionGroupConfig) { - response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); - response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " - + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); - return response; - } - - - if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { - response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); - return response; - } - - - if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); - int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); - - - int topicSysFlag = 0; - if (requestHeader.isUnitMode()) { - topicSysFlag = TopicSysFlag.buildSysFlag(false, true); - } - - - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(// - newTopic, // - subscriptionGroupConfig.getRetryQueueNums(), // - PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); - if (null == topicConfig) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("topic[" + newTopic + "] not exist"); - return response; - } - - - if (!PermName.isWriteable(topicConfig.getPerm())) { - response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); - return response; - } - - MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); - if (null == msgExt) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("look message by offset failed, " + requestHeader.getOffset()); - return response; - } - - - final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); - if (null == retryTopic) { - MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); - } - msgExt.setWaitStoreMsgOK(false); - - - int delayLevel = requestHeader.getDelayLevel(); - - - int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); - if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { - maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); - } - - - if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// - || delayLevel < 0) { - newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; - - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // - PermName.PERM_WRITE, 0 - ); - if (null == topicConfig) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("topic[" + newTopic + "] not exist"); - return response; - } - } else { - if (0 == delayLevel) { - delayLevel = 3 + msgExt.getReconsumeTimes(); - } - - msgExt.setDelayTimeLevel(delayLevel); - } - - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(newTopic); - msgInner.setBody(msgExt.getBody()); - msgInner.setFlag(msgExt.getFlag()); - MessageAccessor.setProperties(msgInner, msgExt.getProperties()); - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); - msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); - - msgInner.setQueueId(queueIdInt); - msgInner.setSysFlag(msgExt.getSysFlag()); - msgInner.setBornTimestamp(msgExt.getBornTimestamp()); - msgInner.setBornHost(msgExt.getBornHost()); - msgInner.setStoreHost(this.getStoreHost()); - msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); - - String originMsgId = MessageAccessor.getOriginMessageId(msgExt); - MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); - - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - if (putMessageResult != null) { - switch (putMessageResult.getPutMessageStatus()) { - case PUT_OK: - String backTopic = msgExt.getTopic(); - String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); - if (correctTopic != null) { - backTopic = correctTopic; - } - - this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - - return response; - default: - break; - } - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(putMessageResult.getPutMessageStatus().name()); - return response; - } - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("putMessageResult is null"); - return response; - } - - private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // - final RemotingCommand request, // - final SendMessageContext sendMessageContext, // - final SendMessageRequestHeader requestHeader) throws RemotingCommandException { - - final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); - final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); - - - response.setOpaque(request.getOpaque()); - - response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); - response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); - - if (log.isDebugEnabled()) { - log.debug("receive SendMessage request command, " + request); - } - - final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); - if (this.brokerController.getMessageStore().now() < startTimstamp) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); - return response; - } - - response.setCode(-1); - super.msgCheck(ctx, requestHeader, response); - if (response.getCode() != -1) { - return response; - } - - final byte[] body = request.getBody(); - - int queueIdInt = requestHeader.getQueueId(); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); - - if (queueIdInt < 0) { - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); - } - - int sysFlag = requestHeader.getSysFlag(); - - if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { - sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; - } - - String newTopic = requestHeader.getTopic(); - if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); - SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); - if (null == subscriptionGroupConfig) { - response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); - response.setRemark( - "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); - return response; - } - - - int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); - if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { - maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); - } - int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); - if (reconsumeTimes >= maxReconsumeTimes) { - newTopic = MixAll.getDLQTopic(groupName); - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // - PermName.PERM_WRITE, 0 - ); - if (null == topicConfig) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("topic[" + newTopic + "] not exist"); - return response; - } - } - } - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(newTopic); - msgInner.setBody(body); - msgInner.setFlag(requestHeader.getFlag()); - MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); - msgInner.setPropertiesString(requestHeader.getProperties()); - msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags())); - - msgInner.setQueueId(queueIdInt); - msgInner.setSysFlag(sysFlag); - msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); - msgInner.setBornHost(ctx.channel().remoteAddress()); - msgInner.setStoreHost(this.getStoreHost()); - msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); - - if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { - String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); - if (traFlag != null) { - response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark( - "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); - return response; - } - } - - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - if (putMessageResult != null) { - boolean sendOK = false; - - switch (putMessageResult.getPutMessageStatus()) { - // Success - case PUT_OK: - sendOK = true; - response.setCode(ResponseCode.SUCCESS); - break; - case FLUSH_DISK_TIMEOUT: - response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); - sendOK = true; - break; - case FLUSH_SLAVE_TIMEOUT: - response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); - sendOK = true; - break; - case SLAVE_NOT_AVAILABLE: - response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); - sendOK = true; - break; - - // Failed - case CREATE_MAPEDFILE_FAILED: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("create mapped file failed, server is busy or broken."); - break; - case MESSAGE_ILLEGAL: - case PROPERTIES_SIZE_EXCEEDED: - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark( - "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); - break; - case SERVICE_NOT_AVAILABLE: - response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); - response.setRemark( - "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); - break; - case OS_PAGECACHE_BUSY: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); - break; - case UNKNOWN_ERROR: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR"); - break; - default: - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UNKNOWN_ERROR DEFAULT"); - break; - } - - String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); - if (sendOK) { - - this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); - this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); - this.brokerController.getBrokerStatsManager().incBrokerPutNums(); - - response.setRemark(null); - - responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); - responseHeader.setQueueId(queueIdInt); - responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); - - - doResponse(ctx, request, response); - - - if (hasSendMessageHook()) { - sendMessageContext.setMsgId(responseHeader.getMsgId()); - sendMessageContext.setQueueId(responseHeader.getQueueId()); - sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); - - int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); - int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; - - sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); - sendMessageContext.setCommercialSendTimes(incValue); - sendMessageContext.setCommercialSendSize(wroteSize); - sendMessageContext.setCommercialOwner(owner); - } - return null; - } else { - if (hasSendMessageHook()) { - int wroteSize = request.getBody().length; - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); - - sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); - sendMessageContext.setCommercialSendTimes(incValue); - sendMessageContext.setCommercialSendSize(wroteSize); - sendMessageContext.setCommercialOwner(owner); - } - } - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("store putMessage return null"); - } - - return response; - } - - public boolean hasConsumeMessageHook() { - return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); - } - - public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) { - if (hasConsumeMessageHook()) { - for (ConsumeMessageHook hook : this.consumeMessageHookList) { - try { - hook.consumeMessageAfter(context); - } catch (Throwable e) { - } - } - } - } - - public SocketAddress getStoreHost() { - return storeHost; - } - - private String diskUtil() { - String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); - - String storePathLogis = - StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); - double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis); - - String storePathIndex = - StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); - double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex); - - return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio); - } - - public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) { - this.consumeMessageHookList = consumeMessageHookList; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java deleted file mode 100644 index 8b3aefe..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.slave; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper; -import com.alibaba.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import com.alibaba.rocketmq.store.config.StorePathConfigHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - - -/** - * @author shijia.wxr - * @author manhong.yqd - */ -public class SlaveSynchronize { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final BrokerController brokerController; - private volatile String masterAddr = null; - - - public SlaveSynchronize(BrokerController brokerController) { - this.brokerController = brokerController; - } - - - public String getMasterAddr() { - return masterAddr; - } - - - public void setMasterAddr(String masterAddr) { - this.masterAddr = masterAddr; - } - - - public void syncAll() { - this.syncTopicConfig(); - this.syncConsumerOffset(); - this.syncDelayOffset(); - this.syncSubscriptionGroupConfig(); - } - - - private void syncTopicConfig() { - String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { - try { - TopicConfigSerializeWrapper topicWrapper = - this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); - if (!this.brokerController.getTopicConfigManager().getDataVersion() - .equals(topicWrapper.getDataVersion())) { - - this.brokerController.getTopicConfigManager().getDataVersion() - .assignNewOne(topicWrapper.getDataVersion()); - this.brokerController.getTopicConfigManager().getTopicConfigTable().clear(); - this.brokerController.getTopicConfigManager().getTopicConfigTable() - .putAll(topicWrapper.getTopicConfigTable()); - this.brokerController.getTopicConfigManager().persist(); - - log.info("update slave topic config from master, {}", masterAddrBak); - } - } catch (Exception e) { - log.error("syncTopicConfig Exception, " + masterAddrBak, e); - } - } - } - - - private void syncConsumerOffset() { - String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { - try { - ConsumerOffsetSerializeWrapper offsetWrapper = - this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); - this.brokerController.getConsumerOffsetManager().getOffsetTable() - .putAll(offsetWrapper.getOffsetTable()); - this.brokerController.getConsumerOffsetManager().persist(); - log.info("update slave consumer offset from master, {}", masterAddrBak); - } catch (Exception e) { - log.error("syncConsumerOffset Exception, " + masterAddrBak, e); - } - } - } - - - private void syncDelayOffset() { - String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { - try { - String delayOffset = - this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); - if (delayOffset != null) { - - String fileName = - StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController - .getMessageStoreConfig().getStorePathRootDir()); - try { - MixAll.string2File(delayOffset, fileName); - } catch (IOException e) { - log.error("persist file Exception, " + fileName, e); - } - } - log.info("update slave delay offset from master, {}", masterAddrBak); - } catch (Exception e) { - log.error("syncDelayOffset Exception, " + masterAddrBak, e); - } - } - } - - - private void syncSubscriptionGroupConfig() { - String masterAddrBak = this.masterAddr; - if (masterAddrBak != null) { - try { - SubscriptionGroupWrapper subscriptionWrapper = - this.brokerController.getBrokerOuterAPI() - .getAllSubscriptionGroupConfig(masterAddrBak); - - if (!this.brokerController.getSubscriptionGroupManager().getDataVersion() - .equals(subscriptionWrapper.getDataVersion())) { - SubscriptionGroupManager subscriptionGroupManager = - this.brokerController.getSubscriptionGroupManager(); - subscriptionGroupManager.getDataVersion().assignNewOne( - subscriptionWrapper.getDataVersion()); - subscriptionGroupManager.getSubscriptionGroupTable().clear(); - subscriptionGroupManager.getSubscriptionGroupTable().putAll( - subscriptionWrapper.getSubscriptionGroupTable()); - subscriptionGroupManager.persist(); - log.info("update slave Subscription Group from master, {}", masterAddrBak); - } - } catch (Exception e) { - log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e); - } - } - } -}
