http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java new file mode 100644 index 0000000..32af402 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.rocketmq.broker.plugin; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.store.MessageArrivingListener; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; + +public class MessageStorePluginContext { + private MessageStoreConfig messageStoreConfig; + private BrokerStatsManager brokerStatsManager; + private MessageArrivingListener messageArrivingListener; + private BrokerConfig brokerConfig; + + public MessageStorePluginContext(MessageStoreConfig messageStoreConfig, + BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, + BrokerConfig brokerConfig) { + super(); + this.messageStoreConfig = messageStoreConfig; + this.brokerStatsManager = brokerStatsManager; + this.messageArrivingListener = messageArrivingListener; + this.brokerConfig = brokerConfig; + } + + public MessageStoreConfig getMessageStoreConfig() { + return messageStoreConfig; + } + + public BrokerStatsManager getBrokerStatsManager() { + return brokerStatsManager; + } + + public MessageArrivingListener getMessageArrivingListener() { + return messageArrivingListener; + } + + public BrokerConfig getBrokerConfig() { + return brokerConfig; + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java new file mode 100644 index 0000000..3cf28b3 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.mqtrace.SendMessageContext; +import org.apache.rocketmq.broker.mqtrace.SendMessageHook; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.DBMsgConstants; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; +import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.common.utils.ChannelUtil; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Random; + + +/** + * @author shijia.wxr + */ +public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { + protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + protected final static int DLQ_NUMS_PER_GROUP = 1; + protected final BrokerController brokerController; + protected final Random random = new Random(System.currentTimeMillis()); + protected final SocketAddress storeHost; + private List<SendMessageHook> sendMessageHookList; + + + public AbstractSendMessageProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + this.storeHost = + new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController + .getNettyServerConfig().getListenPort()); + } + + protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx, + SendMessageRequestHeader requestHeader) { + if (!this.hasSendMessageHook()) { + return null; + } + SendMessageContext mqtraceContext; + mqtraceContext = new SendMessageContext(); + mqtraceContext.setProducerGroup(requestHeader.getProducerGroup()); + mqtraceContext.setTopic(requestHeader.getTopic()); + mqtraceContext.setMsgProps(requestHeader.getProperties()); + mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + mqtraceContext.setBrokerAddr(this.brokerController.getBrokerAddr()); + mqtraceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId()); + mqtraceContext.setBornTimeStamp(requestHeader.getBornTimestamp()); + + Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties()); + String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + properties.put(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + requestHeader.setProperties(MessageDecoder.messageProperties2String(properties)); + + + if (uniqueKey == null) { + uniqueKey = ""; + } + mqtraceContext.setMsgUniqueKey(uniqueKey); + return mqtraceContext; + } + + public boolean hasSendMessageHook() { + return sendMessageHookList != null && !this.sendMessageHookList.isEmpty(); + } + + protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx, + final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) { + int queueIdInt = requestHeader.getQueueId(); + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + int sysFlag = requestHeader.getSysFlag(); + + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(requestHeader.getTopic()); + msgInner.setBody(body); + msgInner.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(msgInner, + MessageDecoder.string2messageProperties(requestHeader.getProperties())); + msgInner.setPropertiesString(requestHeader.getProperties()); + msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), + msgInner.getTags())); + + msgInner.setQueueId(queueIdInt); + msgInner.setSysFlag(sysFlag); + msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); + msgInner.setBornHost(ctx.channel().remoteAddress()); + msgInner.setStoreHost(this.getStoreHost()); + msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader + .getReconsumeTimes()); + return msgInner; + } + + public SocketAddress getStoreHost() { + return storeHost; + } + + protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx, + final SendMessageRequestHeader requestHeader, RemotingCommand request, + final RemotingCommand response) { + if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { + log.warn("putMessage message topic length too long " + requestHeader.getTopic().length()); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + return response; + } + if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long " + + requestHeader.getProperties().length()); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + return response; + } + if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) { + log.warn(" topic {} msg body size {} from {}", requestHeader.getTopic(), + request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel())); + response.setRemark("msg body must be less 64KB"); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + return response; + } + return response; + } + + protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, + final SendMessageRequestHeader requestHeader, final RemotingCommand response) { + if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) + && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + + "] sending message is forbidden"); + return response; + } + if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { + String errorMsg = + "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; + log.warn(errorMsg); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorMsg); + return response; + } + + TopicConfig topicConfig = + this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + if (null == topicConfig) { + int topicSysFlag = 0; + if (requestHeader.isUnitMode()) { + if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + topicSysFlag = TopicSysFlag.buildSysFlag(false, true); + } else { + topicSysFlag = TopicSysFlag.buildSysFlag(true, false); + } + } + + log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: " + + ctx.channel().remoteAddress()); + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(// + requestHeader.getTopic(), // + requestHeader.getDefaultTopic(), // + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), // + requestHeader.getDefaultTopicQueueNums(), topicSysFlag); + + if (null == topicConfig) { + if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + topicConfig = + this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( + requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, + topicSysFlag); + } + } + + if (null == topicConfig) { + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + return response; + } + } + + int queueIdInt = requestHeader.getQueueId(); + int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); + if (queueIdInt >= idValid) { + String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s", + queueIdInt, + topicConfig.toString(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + log.warn(errorInfo); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorInfo); + + return response; + } + return response; + } + + public void registerSendMessageHook(List<SendMessageHook> sendMessageHookList) { + this.sendMessageHookList = sendMessageHookList; + } + + protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request, + final RemotingCommand response) { + if (!request.isOnewayRPC()) { + try { + ctx.writeAndFlush(response); + } catch (Throwable e) { + log.error("SendMessageProcessor process request over, but response failed", e); + log.error(request.toString()); + log.error(response.toString()); + } + } + } + + public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request, + SendMessageContext context) { + if (hasSendMessageHook()) { + for (SendMessageHook hook : this.sendMessageHookList) { + try { + final SendMessageRequestHeader requestHeader = parseRequestHeader(request); + + if (null != requestHeader) { + context.setProducerGroup(requestHeader.getProducerGroup()); + context.setTopic(requestHeader.getTopic()); + context.setBodyLength(request.getBody().length); + context.setMsgProps(requestHeader.getProperties()); + context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + context.setBrokerAddr(this.brokerController.getBrokerAddr()); + context.setQueueId(requestHeader.getQueueId()); + } + + hook.sendMessageBefore(context); + requestHeader.setProperties(context.getMsgProps()); + } catch (Throwable e) { + } + } + } + } + + protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) + throws RemotingCommandException { + + SendMessageRequestHeaderV2 requestHeaderV2 = null; + SendMessageRequestHeader requestHeader = null; + switch (request.getCode()) { + case RequestCode.SEND_MESSAGE_V2: + requestHeaderV2 = + (SendMessageRequestHeaderV2) request + .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + case RequestCode.SEND_MESSAGE: + if (null == requestHeaderV2) { + requestHeader = + (SendMessageRequestHeader) request + .decodeCommandCustomHeader(SendMessageRequestHeader.class); + } else { + requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); + } + default: + break; + } + return requestHeader; + } + + public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) { + if (hasSendMessageHook()) { + for (SendMessageHook hook : this.sendMessageHookList) { + try { + if (response != null) { + final SendMessageResponseHeader responseHeader = + (SendMessageResponseHeader) response.readCustomHeader(); + context.setMsgId(responseHeader.getMsgId()); + context.setQueueId(responseHeader.getQueueId()); + context.setQueueOffset(responseHeader.getQueueOffset()); + context.setCode(response.getCode()); + context.setErrorMsg(response.getRemark()); + } + hook.sendMessageAfter(context); + } catch (Throwable e) { + + } + } + } + } + + @Override + public boolean rejectRequest() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java new file mode 100644 index 0000000..c1241bb --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -0,0 +1,1212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.OffsetWrapper; +import org.apache.rocketmq.common.admin.TopicOffset; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageId; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.*; +import org.apache.rocketmq.common.protocol.header.*; +import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; +import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.stats.StatsItem; +import org.apache.rocketmq.common.stats.StatsSnapshot; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + * @author manhong.yqd + */ +public class AdminBrokerProcessor implements NettyRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + + public AdminBrokerProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + switch (request.getCode()) { + case RequestCode.UPDATE_AND_CREATE_TOPIC: + return this.updateAndCreateTopic(ctx, request); + case RequestCode.DELETE_TOPIC_IN_BROKER: + return this.deleteTopic(ctx, request); + case RequestCode.GET_ALL_TOPIC_CONFIG: + return this.getAllTopicConfig(ctx, request); + case RequestCode.UPDATE_BROKER_CONFIG: + return this.updateBrokerConfig(ctx, request); + case RequestCode.GET_BROKER_CONFIG: + return this.getBrokerConfig(ctx, request); + case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP: + return this.searchOffsetByTimestamp(ctx, request); + case RequestCode.GET_MAX_OFFSET: + return this.getMaxOffset(ctx, request); + case RequestCode.GET_MIN_OFFSET: + return this.getMinOffset(ctx, request); + case RequestCode.GET_EARLIEST_MSG_STORETIME: + return this.getEarliestMsgStoretime(ctx, request); + case RequestCode.GET_BROKER_RUNTIME_INFO: + return this.getBrokerRuntimeInfo(ctx, request); + case RequestCode.LOCK_BATCH_MQ: + return this.lockBatchMQ(ctx, request); + case RequestCode.UNLOCK_BATCH_MQ: + return this.unlockBatchMQ(ctx, request); + case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP: + return this.updateAndCreateSubscriptionGroup(ctx, request); + case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG: + return this.getAllSubscriptionGroup(ctx, request); + case RequestCode.DELETE_SUBSCRIPTIONGROUP: + return this.deleteSubscriptionGroup(ctx, request); + case RequestCode.GET_TOPIC_STATS_INFO: + return this.getTopicStatsInfo(ctx, request); + case RequestCode.GET_CONSUMER_CONNECTION_LIST: + return this.getConsumerConnectionList(ctx, request); + case RequestCode.GET_PRODUCER_CONNECTION_LIST: + return this.getProducerConnectionList(ctx, request); + case RequestCode.GET_CONSUME_STATS: + return this.getConsumeStats(ctx, request); + case RequestCode.GET_ALL_CONSUMER_OFFSET: + return this.getAllConsumerOffset(ctx, request); + case RequestCode.GET_ALL_DELAY_OFFSET: + return this.getAllDelayOffset(ctx, request); + case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET: + return this.resetOffset(ctx, request); + case RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS: + return this.getConsumerStatus(ctx, request); + case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO: + return this.queryTopicConsumeByWho(ctx, request); + case RequestCode.REGISTER_FILTER_SERVER: + return this.registerFilterServer(ctx, request); + case RequestCode.QUERY_CONSUME_TIME_SPAN: + return this.queryConsumeTimeSpan(ctx, request); + case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER: + return this.getSystemTopicListFromBroker(ctx, request); + case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE: + return this.cleanExpiredConsumeQueue(); + case RequestCode.CLEAN_UNUSED_TOPIC: + return this.cleanUnusedTopic(); + case RequestCode.GET_CONSUMER_RUNNING_INFO: + return this.getConsumerRunningInfo(ctx, request); + case RequestCode.QUERY_CORRECTION_OFFSET: + return this.queryCorrectionOffset(ctx, request); + case RequestCode.CONSUME_MESSAGE_DIRECTLY: + return this.consumeMessageDirectly(ctx, request); + case RequestCode.CLONE_GROUP_OFFSET: + return this.cloneGroupOffset(ctx, request); + case RequestCode.VIEW_BROKER_STATS_DATA: + return ViewBrokerStatsData(ctx, request); + case RequestCode.GET_BROKER_CONSUME_STATS: + return fetchAllConsumeStatsInBroker(ctx, request); + default: + break; + } + + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + + private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final CreateTopicRequestHeader requestHeader = + (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); + log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + + if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) { + String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; + log.warn(errorMsg); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorMsg); + return response; + } + + try { + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + response.markResponseType(); + response.setRemark(null); + ctx.writeAndFlush(response); + } catch (Exception e) { + } + + TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic()); + topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); + topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); + topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); + topicConfig.setPerm(requestHeader.getPerm()); + topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); + + this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); + this.brokerController.registerBrokerAll(false, true); + return null; + } + + private RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + DeleteTopicRequestHeader requestHeader = + (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class); + + log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); + this.brokerController.getMessageStore() + .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class); + // final GetAllTopicConfigResponseHeader responseHeader = + // (GetAllTopicConfigResponseHeader) response.readCustomHeader(); + + String content = this.brokerController.getTopicConfigManager().encode(); + if (content != null && content.length() > 0) { + try { + response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); + } catch (UnsupportedEncodingException e) { + log.error("", e); + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UnsupportedEncodingException " + e); + return response; + } + } else { + log.error("No topic in this broker, client: " + ctx.channel().remoteAddress()); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("No topic in this broker"); + return response; + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + return response; + } + + private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + byte[] body = request.getBody(); + if (body != null) { + try { + String bodyStr = new String(body, MixAll.DEFAULT_CHARSET); + Properties properties = MixAll.string2Properties(bodyStr); + if (properties != null) { + log.info("updateBrokerConfig, new config: " + properties + " client: " + ctx.channel().remoteAddress()); + this.brokerController.getConfiguration().update(properties); + if (properties.containsKey("brokerPermission")) { + this.brokerController.registerBrokerAll(false, false); + this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(); + } + } else { + log.error("string2Properties error"); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("string2Properties error"); + return response; + } + } catch (UnsupportedEncodingException e) { + log.error("", e); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UnsupportedEncodingException " + e); + return response; + } + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { + + final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class); + final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response.readCustomHeader(); + + String content = this.brokerController.getConfiguration().getAllConfigsFormatString(); + if (content != null && content.length() > 0) { + try { + response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); + } catch (UnsupportedEncodingException e) { + log.error("", e); + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UnsupportedEncodingException " + e); + return response; + } + } + + responseHeader.setVersion(this.brokerController.getConfiguration().getDataVersionJson()); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class); + final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader(); + final SearchOffsetRequestHeader requestHeader = + (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class); + + long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), + requestHeader.getTimestamp()); + + responseHeader.setOffset(offset); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); + final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); + final GetMaxOffsetRequestHeader requestHeader = + (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); + + long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); + + responseHeader.setOffset(offset); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); + final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); + final GetMinOffsetRequestHeader requestHeader = + (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); + + long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); + + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class); + final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader(); + final GetEarliestMsgStoretimeRequestHeader requestHeader = + (GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class); + + long timestamp = + this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId()); + + responseHeader.setTimestamp(timestamp); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getBrokerRuntimeInfo(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + HashMap<String, String> runtimeInfo = this.prepareRuntimeInfo(); + KVTable kvTable = new KVTable(); + kvTable.setTable(runtimeInfo); + + byte[] body = kvTable.encode(); + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); + + Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(// + requestBody.getConsumerGroup(), // + requestBody.getMqSet(), // + requestBody.getClientId()); + + LockBatchResponseBody responseBody = new LockBatchResponseBody(); + responseBody.setLockOKMQSet(lockOKMQSet); + + response.setBody(responseBody.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); + + this.brokerController.getRebalanceLockManager().unlockBatch(// + requestBody.getConsumerGroup(), // + requestBody.getMqSet(), // + requestBody.getClientId()); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + log.info("updateAndCreateSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + SubscriptionGroupConfig config = RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class); + if (config != null) { + this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config); + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + String content = this.brokerController.getSubscriptionGroupManager().encode(); + if (content != null && content.length() > 0) { + try { + response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); + } catch (UnsupportedEncodingException e) { + log.error("", e); + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UnsupportedEncodingException " + e); + return response; + } + } else { + log.error("No subscription group in this broker, client: " + ctx.channel().remoteAddress()); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("No subscription group in this broker"); + return response; + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + return response; + } + + private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + DeleteSubscriptionGroupRequestHeader requestHeader = + (DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class); + + log.info("deleteSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName()); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetTopicStatsInfoRequestHeader requestHeader = + (GetTopicStatsInfoRequestHeader) request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class); + + final String topic = requestHeader.getTopic(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + if (null == topicConfig) { + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark("topic[" + topic + "] not exist"); + return response; + } + + TopicStatsTable topicStatsTable = new TopicStatsTable(); + for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { + MessageQueue mq = new MessageQueue(); + mq.setTopic(topic); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + mq.setQueueId(i); + + TopicOffset topicOffset = new TopicOffset(); + long min = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, i); + if (min < 0) + min = 0; + + long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + if (max < 0) + max = 0; + + long timestamp = 0; + if (max > 0) { + timestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1); + } + + topicOffset.setMinOffset(min); + topicOffset.setMaxOffset(max); + topicOffset.setLastUpdateTimestamp(timestamp); + + topicStatsTable.getOffsetTable().put(mq, topicOffset); + } + + byte[] body = topicStatsTable.encode(); + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetConsumerConnectionListRequestHeader requestHeader = + (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); + + ConsumerGroupInfo consumerGroupInfo = + this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); + if (consumerGroupInfo != null) { + ConsumerConnection bodydata = new ConsumerConnection(); + bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere()); + bodydata.setConsumeType(consumerGroupInfo.getConsumeType()); + bodydata.setMessageModel(consumerGroupInfo.getMessageModel()); + bodydata.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable()); + + Iterator<Map.Entry<Channel, ClientChannelInfo>> it = consumerGroupInfo.getChannelInfoTable().entrySet().iterator(); + while (it.hasNext()) { + ClientChannelInfo info = it.next().getValue(); + Connection connection = new Connection(); + connection.setClientId(info.getClientId()); + connection.setLanguage(info.getLanguage()); + connection.setVersion(info.getVersion()); + connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel())); + + bodydata.getConnectionSet().add(connection); + } + + byte[] body = bodydata.encode(); + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + return response; + } + + response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); + response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not online"); + return response; + } + + private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetProducerConnectionListRequestHeader requestHeader = + (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); + + ProducerConnection bodydata = new ProducerConnection(); + HashMap<Channel, ClientChannelInfo> channelInfoHashMap = + this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup()); + if (channelInfoHashMap != null) { + Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator(); + while (it.hasNext()) { + ClientChannelInfo info = it.next().getValue(); + Connection connection = new Connection(); + connection.setClientId(info.getClientId()); + connection.setLanguage(info.getLanguage()); + connection.setVersion(info.getVersion()); + connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel())); + + bodydata.getConnectionSet().add(connection); + } + + byte[] body = bodydata.encode(); + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("the producer group[" + requestHeader.getProducerGroup() + "] not exist"); + return response; + } + + private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetConsumeStatsRequestHeader requestHeader = + (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); + + ConsumeStats consumeStats = new ConsumeStats(); + + Set<String> topics = new HashSet<String>(); + if (UtilAll.isBlank(requestHeader.getTopic())) { + topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup()); + } else { + topics.add(requestHeader.getTopic()); + } + + for (String topic : topics) { + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + if (null == topicConfig) { + log.warn("consumeStats, topic config not exist, {}", topic); + continue; + } + + /** + + */ + { + SubscriptionData findSubscriptionData = + this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); + + if (null == findSubscriptionData // + && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { + log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic); + continue; + } + } + + for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { + MessageQueue mq = new MessageQueue(); + mq.setTopic(topic); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + mq.setQueueId(i); + + OffsetWrapper offsetWrapper = new OffsetWrapper(); + + long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + if (brokerOffset < 0) + brokerOffset = 0; + + long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// + requestHeader.getConsumerGroup(), // + topic, // + i); + if (consumerOffset < 0) + consumerOffset = 0; + + offsetWrapper.setBrokerOffset(brokerOffset); + offsetWrapper.setConsumerOffset(consumerOffset); + + + long timeOffset = consumerOffset - 1; + if (timeOffset >= 0) { + long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); + if (lastTimestamp > 0) { + offsetWrapper.setLastTimestamp(lastTimestamp); + } + } + + consumeStats.getOffsetTable().put(mq, offsetWrapper); + } + + double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic); + + consumeTps += consumeStats.getConsumeTps(); + consumeStats.setConsumeTps(consumeTps); + } + + byte[] body = consumeStats.encode(); + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + String content = this.brokerController.getConsumerOffsetManager().encode(); + if (content != null && content.length() > 0) { + try { + response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); + } catch (UnsupportedEncodingException e) { + log.error("get all consumer offset from master error.", e); + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UnsupportedEncodingException " + e); + return response; + } + } else { + log.error("No consumer offset in this broker, client: " + ctx.channel().remoteAddress()); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("No consumer offset in this broker"); + return response; + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + return response; + } + + private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); + if (content != null && content.length() > 0) { + try { + response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); + } catch (UnsupportedEncodingException e) { + log.error("get all delay offset from master error.", e); + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UnsupportedEncodingException " + e); + return response; + } + } else { + log.error("No delay offset in this broker, client: " + ctx.channel().remoteAddress()); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("No delay offset in this broker"); + return response; + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + return response; + } + + public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final ResetOffsetRequestHeader requestHeader = + (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); + log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", + new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp(), requestHeader.isForce()}); + boolean isC = false; + LanguageCode language = request.getLanguage(); + switch (language) { + case CPP: + isC = true; + break; + } + return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp(), requestHeader.isForce(), isC); + } + + public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final GetConsumerStatusRequestHeader requestHeader = + (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); + + log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}", + new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()}); + + return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getClientAddr()); + } + + private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + QueryTopicConsumeByWhoRequestHeader requestHeader = + (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); + + + HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic()); + + Set<String> groupInOffset = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(requestHeader.getTopic()); + if (groupInOffset != null && !groupInOffset.isEmpty()) { + groups.addAll(groupInOffset); + } + + GroupList groupList = new GroupList(); + groupList.setGroupList(groups); + byte[] body = groupList.encode(); + + response.setBody(body); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class); + final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader(); + final RegisterFilterServerRequestHeader requestHeader = + (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class); + + this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr()); + + responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId()); + responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + QueryConsumeTimeSpanRequestHeader requestHeader = + (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class); + + final String topic = requestHeader.getTopic(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + if (null == topicConfig) { + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark("topic[" + topic + "] not exist"); + return response; + } + + List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>(); + for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { + QueueTimeSpan timeSpan = new QueueTimeSpan(); + MessageQueue mq = new MessageQueue(); + mq.setTopic(topic); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + mq.setQueueId(i); + timeSpan.setMessageQueue(mq); + + long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i); + timeSpan.setMinTimeStamp(minTime); + + long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1); + timeSpan.setMaxTimeStamp(maxTime); + + long consumeTime; + long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( + requestHeader.getGroup(), topic, i); + if (consumerOffset > 0) { + consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1); + } else { + consumeTime = minTime; + } + timeSpan.setConsumeTimeStamp(consumeTime); + + long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), i); + if (consumerOffset < maxBrokerOffset) { + long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset); + timeSpan.setDelayTime(System.currentTimeMillis() - nextTime); + } + timeSpanSet.add(timeSpan); + } + + QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody(); + queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet); + response.setBody(queryConsumeTimeSpanBody.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + Set<String> topics = this.brokerController.getTopicConfigManager().getSystemTopic(); + TopicList topicList = new TopicList(); + topicList.setTopicList(topics); + response.setBody(topicList.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + public RemotingCommand cleanExpiredConsumeQueue() { + log.warn("invoke cleanExpiredConsumeQueue start."); + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + brokerController.getMessageStore().cleanExpiredConsumerQueue(); + log.warn("invoke cleanExpiredConsumeQueue end."); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + public RemotingCommand cleanUnusedTopic() { + log.warn("invoke cleanUnusedTopic start."); + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + brokerController.getMessageStore().cleanUnusedTopic(brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); + log.warn("invoke cleanUnusedTopic end."); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + /** + + */ + private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final GetConsumerRunningInfoRequestHeader requestHeader = + (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); + + return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(), + requestHeader.getClientId()); + } + + private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + QueryCorrectionOffsetHeader requestHeader = + (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class); + + Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager() + .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups()); + + Map<Integer, Long> compareOffset = + this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup()); + + if (compareOffset != null && !compareOffset.isEmpty()) { + for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) { + Integer queueId = entry.getKey(); + correctionOffset.put(queueId, + correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId)); + } + } + + QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody(); + body.setCorrectionOffsets(correctionOffset); + response.setBody(body.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request + .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); + + request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName()); + SelectMappedBufferResult selectMappedBufferResult = null; + try { + MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId()); + selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset()); + + byte[] body = new byte[selectMappedBufferResult.getSize()]; + selectMappedBufferResult.getByteBuffer().get(body); + request.setBody(body); + } catch (UnknownHostException e) { + } finally { + if (selectMappedBufferResult != null) { + selectMappedBufferResult.release(); + } + } + + return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(), + requestHeader.getClientId()); + } + + private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + CloneGroupOffsetRequestHeader requestHeader = + (CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class); + + Set<String> topics; + if (UtilAll.isBlank(requestHeader.getTopic())) { + topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup()); + } else { + topics = new HashSet<String>(); + topics.add(requestHeader.getTopic()); + } + + for (String topic : topics) { + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + if (null == topicConfig) { + log.warn("[cloneGroupOffset], topic config not exist, {}", topic); + continue; + } + + /** + + */ + if (!requestHeader.isOffline()) { + + SubscriptionData findSubscriptionData = + this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic); + if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0 + && findSubscriptionData == null) { + log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic); + continue; + } + } + + this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(), + requestHeader.getTopic()); + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final ViewBrokerStatsDataRequestHeader requestHeader = + (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); + + StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); + if (null == statsItem) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The stats <%s> <%s> not exist", requestHeader.getStatsName(), requestHeader.getStatsKey())); + return response; + } + + BrokerStatsData brokerStatsData = new BrokerStatsData(); + + { + BrokerStatsItem it = new BrokerStatsItem(); + StatsSnapshot ss = statsItem.getStatsDataInMinute(); + it.setSum(ss.getSum()); + it.setTps(ss.getTps()); + it.setAvgpt(ss.getAvgpt()); + brokerStatsData.setStatsMinute(it); + } + + + { + BrokerStatsItem it = new BrokerStatsItem(); + StatsSnapshot ss = statsItem.getStatsDataInHour(); + it.setSum(ss.getSum()); + it.setTps(ss.getTps()); + it.setAvgpt(ss.getAvgpt()); + brokerStatsData.setStatsHour(it); + } + + + { + BrokerStatsItem it = new BrokerStatsItem(); + StatsSnapshot ss = statsItem.getStatsDataInDay(); + it.setSum(ss.getSum()); + it.setTps(ss.getTps()); + it.setAvgpt(ss.getAvgpt()); + brokerStatsData.setStatsDay(it); + } + + response.setBody(brokerStatsData.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + GetConsumeStatsInBrokerHeader requestHeader = + (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); + boolean isOrder = requestHeader.isOrder(); + ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups = + brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); + + List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList = + new ArrayList<Map<String, List<ConsumeStats>>>(); + + long totalDiff = 0L; + + for (String group : subscriptionGroups.keySet()) { + Map<String, List<ConsumeStats>> subscripTopicConsumeMap = new HashMap<String, List<ConsumeStats>>(); + Set<String> topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group); + List<ConsumeStats> consumeStatsList = new ArrayList<ConsumeStats>(); + for (String topic : topics) { + ConsumeStats consumeStats = new ConsumeStats(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + if (null == topicConfig) { + log.warn("consumeStats, topic config not exist, {}", topic); + continue; + } + + if (isOrder && !topicConfig.isOrder()) { + continue; + } + /** + + */ + { + SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic); + + if (null == findSubscriptionData // + && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) { + log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic); + continue; + } + } + + for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { + MessageQueue mq = new MessageQueue(); + mq.setTopic(topic); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + mq.setQueueId(i); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + if (brokerOffset < 0) + brokerOffset = 0; + long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// + group, // + topic, // + i); + if (consumerOffset < 0) + consumerOffset = 0; + + offsetWrapper.setBrokerOffset(brokerOffset); + offsetWrapper.setConsumerOffset(consumerOffset); + + + long timeOffset = consumerOffset - 1; + if (timeOffset >= 0) { + long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); + if (lastTimestamp > 0) { + offsetWrapper.setLastTimestamp(lastTimestamp); + } + } + consumeStats.getOffsetTable().put(mq, offsetWrapper); + } + double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(group, topic); + consumeTps += consumeStats.getConsumeTps(); + consumeStats.setConsumeTps(consumeTps); + totalDiff += consumeStats.computeTotalDiff(); + consumeStatsList.add(consumeStats); + } + subscripTopicConsumeMap.put(group, consumeStatsList); + brokerConsumeStatsList.add(subscripTopicConsumeMap); + } + ConsumeStatsList consumeStats = new ConsumeStatsList(); + consumeStats.setBrokerAddr(brokerController.getBrokerAddr()); + consumeStats.setConsumeStatsList(brokerConsumeStatsList); + consumeStats.setTotalDiff(totalDiff); + response.setBody(consumeStats.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private HashMap<String, String> prepareRuntimeInfo() { + HashMap<String, String> runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo(); + runtimeInfo.put("brokerVersionDesc", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); + runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CURRENT_VERSION)); + + runtimeInfo.put("msgPutTotalYesterdayMorning", + String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning())); + runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning())); + runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow())); + + runtimeInfo.put("msgGetTotalYesterdayMorning", + String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning())); + runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning())); + runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow())); + + runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size())); + + runtimeInfo.put("sendThreadPoolQueueCapacity", + String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity())); + + runtimeInfo.put("pullThreadPoolQueueSize", String.valueOf(this.brokerController.getPullThreadPoolQueue().size())); + runtimeInfo.put("pullThreadPoolQueueCapacity", + String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity())); + + runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); + runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); + + runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue())); + runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue())); + runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime())); + runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp())); + if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) { + DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); + runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs())); + if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false)); + } + runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToFlush(), false)); + } + + java.io.File commitLogDir = new java.io.File(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + if (commitLogDir.exists()) { + runtimeInfo.put("commitLogDirCapacity", String.format("Total : %s, Free : %s.", MixAll.humanReadableByteCount(commitLogDir.getTotalSpace(), false), MixAll.humanReadableByteCount(commitLogDir.getFreeSpace(), false))); + } + + return runtimeInfo; + } + + private RemotingCommand callConsumer(// + final int requestCode, // + final RemotingCommand request, // + final String consumerGroup, // + final String clientId) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId); + + if (null == clientChannelInfo) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId)); + return response; + } + + if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", // + clientId, // + MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); + return response; + } + + try { + RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null); + newRequest.setExtFields(request.getExtFields()); + newRequest.setBody(request.getBody()); + + RemotingCommand consumerResponse = + this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); + return consumerResponse; + } catch (RemotingTimeoutException e) { + response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); + response + .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); + return response; + } catch (Exception e) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark( + String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); + return response; + } + } + +}
