http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java deleted file mode 100644 index b45a866..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java +++ /dev/null @@ -1,1212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.processor; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.client.ClientChannelInfo; -import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo; -import com.alibaba.rocketmq.common.MQVersion; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.TopicConfig; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.admin.ConsumeStats; -import com.alibaba.rocketmq.common.admin.OffsetWrapper; -import com.alibaba.rocketmq.common.admin.TopicOffset; -import com.alibaba.rocketmq.common.admin.TopicStatsTable; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.message.MessageId; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.body.*; -import com.alibaba.rocketmq.common.protocol.header.*; -import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import com.alibaba.rocketmq.common.stats.StatsItem; -import com.alibaba.rocketmq.common.stats.StatsSnapshot; -import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.LanguageCode; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; -import com.alibaba.rocketmq.store.DefaultMessageStore; -import com.alibaba.rocketmq.store.SelectMappedBufferResult; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - * @author manhong.yqd - */ -public class AdminBrokerProcessor implements NettyRequestProcessor { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final BrokerController brokerController; - - public AdminBrokerProcessor(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - switch (request.getCode()) { - case RequestCode.UPDATE_AND_CREATE_TOPIC: - return this.updateAndCreateTopic(ctx, request); - case RequestCode.DELETE_TOPIC_IN_BROKER: - return this.deleteTopic(ctx, request); - case RequestCode.GET_ALL_TOPIC_CONFIG: - return this.getAllTopicConfig(ctx, request); - case RequestCode.UPDATE_BROKER_CONFIG: - return this.updateBrokerConfig(ctx, request); - case RequestCode.GET_BROKER_CONFIG: - return this.getBrokerConfig(ctx, request); - case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP: - return this.searchOffsetByTimestamp(ctx, request); - case RequestCode.GET_MAX_OFFSET: - return this.getMaxOffset(ctx, request); - case RequestCode.GET_MIN_OFFSET: - return this.getMinOffset(ctx, request); - case RequestCode.GET_EARLIEST_MSG_STORETIME: - return this.getEarliestMsgStoretime(ctx, request); - case RequestCode.GET_BROKER_RUNTIME_INFO: - return this.getBrokerRuntimeInfo(ctx, request); - case RequestCode.LOCK_BATCH_MQ: - return this.lockBatchMQ(ctx, request); - case RequestCode.UNLOCK_BATCH_MQ: - return this.unlockBatchMQ(ctx, request); - case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP: - return this.updateAndCreateSubscriptionGroup(ctx, request); - case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG: - return this.getAllSubscriptionGroup(ctx, request); - case RequestCode.DELETE_SUBSCRIPTIONGROUP: - return this.deleteSubscriptionGroup(ctx, request); - case RequestCode.GET_TOPIC_STATS_INFO: - return this.getTopicStatsInfo(ctx, request); - case RequestCode.GET_CONSUMER_CONNECTION_LIST: - return this.getConsumerConnectionList(ctx, request); - case RequestCode.GET_PRODUCER_CONNECTION_LIST: - return this.getProducerConnectionList(ctx, request); - case RequestCode.GET_CONSUME_STATS: - return this.getConsumeStats(ctx, request); - case RequestCode.GET_ALL_CONSUMER_OFFSET: - return this.getAllConsumerOffset(ctx, request); - case RequestCode.GET_ALL_DELAY_OFFSET: - return this.getAllDelayOffset(ctx, request); - case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET: - return this.resetOffset(ctx, request); - case RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS: - return this.getConsumerStatus(ctx, request); - case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO: - return this.queryTopicConsumeByWho(ctx, request); - case RequestCode.REGISTER_FILTER_SERVER: - return this.registerFilterServer(ctx, request); - case RequestCode.QUERY_CONSUME_TIME_SPAN: - return this.queryConsumeTimeSpan(ctx, request); - case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER: - return this.getSystemTopicListFromBroker(ctx, request); - case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE: - return this.cleanExpiredConsumeQueue(); - case RequestCode.CLEAN_UNUSED_TOPIC: - return this.cleanUnusedTopic(); - case RequestCode.GET_CONSUMER_RUNNING_INFO: - return this.getConsumerRunningInfo(ctx, request); - case RequestCode.QUERY_CORRECTION_OFFSET: - return this.queryCorrectionOffset(ctx, request); - case RequestCode.CONSUME_MESSAGE_DIRECTLY: - return this.consumeMessageDirectly(ctx, request); - case RequestCode.CLONE_GROUP_OFFSET: - return this.cloneGroupOffset(ctx, request); - case RequestCode.VIEW_BROKER_STATS_DATA: - return ViewBrokerStatsData(ctx, request); - case RequestCode.GET_BROKER_CONSUME_STATS: - return fetchAllConsumeStatsInBroker(ctx, request); - default: - break; - } - - return null; - } - - @Override - public boolean rejectRequest() { - return false; - } - - private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final CreateTopicRequestHeader requestHeader = - (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); - log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - - - if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) { - String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; - log.warn(errorMsg); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(errorMsg); - return response; - } - - try { - response.setCode(ResponseCode.SUCCESS); - response.setOpaque(request.getOpaque()); - response.markResponseType(); - response.setRemark(null); - ctx.writeAndFlush(response); - } catch (Exception e) { - } - - TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic()); - topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); - topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); - topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); - topicConfig.setPerm(requestHeader.getPerm()); - topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); - - this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - this.brokerController.registerBrokerAll(false, true); - return null; - } - - private RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - DeleteTopicRequestHeader requestHeader = - (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class); - - log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - - this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); - this.brokerController.getMessageStore() - .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) { - final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class); - // final GetAllTopicConfigResponseHeader responseHeader = - // (GetAllTopicConfigResponseHeader) response.readCustomHeader(); - - String content = this.brokerController.getTopicConfigManager().encode(); - if (content != null && content.length() > 0) { - try { - response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); - } catch (UnsupportedEncodingException e) { - log.error("", e); - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UnsupportedEncodingException " + e); - return response; - } - } else { - log.error("No topic in this broker, client: " + ctx.channel().remoteAddress()); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("No topic in this broker"); - return response; - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - - return response; - } - - private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - - log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - - byte[] body = request.getBody(); - if (body != null) { - try { - String bodyStr = new String(body, MixAll.DEFAULT_CHARSET); - Properties properties = MixAll.string2Properties(bodyStr); - if (properties != null) { - log.info("updateBrokerConfig, new config: " + properties + " client: " + ctx.channel().remoteAddress()); - this.brokerController.getConfiguration().update(properties); - if (properties.containsKey("brokerPermission")) { - this.brokerController.registerBrokerAll(false, false); - this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(); - } - } else { - log.error("string2Properties error"); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("string2Properties error"); - return response; - } - } catch (UnsupportedEncodingException e) { - log.error("", e); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UnsupportedEncodingException " + e); - return response; - } - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { - - final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class); - final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response.readCustomHeader(); - - String content = this.brokerController.getConfiguration().getAllConfigsFormatString(); - if (content != null && content.length() > 0) { - try { - response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); - } catch (UnsupportedEncodingException e) { - log.error("", e); - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UnsupportedEncodingException " + e); - return response; - } - } - - responseHeader.setVersion(this.brokerController.getConfiguration().getDataVersionJson()); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class); - final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader(); - final SearchOffsetRequestHeader requestHeader = - (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class); - - long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), - requestHeader.getTimestamp()); - - responseHeader.setOffset(offset); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); - final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); - final GetMaxOffsetRequestHeader requestHeader = - (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); - - long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); - - responseHeader.setOffset(offset); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); - final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); - final GetMinOffsetRequestHeader requestHeader = - (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); - - long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); - - responseHeader.setOffset(offset); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class); - final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader(); - final GetEarliestMsgStoretimeRequestHeader requestHeader = - (GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class); - - long timestamp = - this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId()); - - responseHeader.setTimestamp(timestamp); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getBrokerRuntimeInfo(ChannelHandlerContext ctx, RemotingCommand request) { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - - HashMap<String, String> runtimeInfo = this.prepareRuntimeInfo(); - KVTable kvTable = new KVTable(); - kvTable.setTable(runtimeInfo); - - byte[] body = kvTable.encode(); - response.setBody(body); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); - - Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(// - requestBody.getConsumerGroup(), // - requestBody.getMqSet(), // - requestBody.getClientId()); - - LockBatchResponseBody responseBody = new LockBatchResponseBody(); - responseBody.setLockOKMQSet(lockOKMQSet); - - response.setBody(responseBody.encode()); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); - - this.brokerController.getRebalanceLockManager().unlockBatch(// - requestBody.getConsumerGroup(), // - requestBody.getMqSet(), // - requestBody.getClientId()); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - - log.info("updateAndCreateSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - - SubscriptionGroupConfig config = RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class); - if (config != null) { - this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config); - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - String content = this.brokerController.getSubscriptionGroupManager().encode(); - if (content != null && content.length() > 0) { - try { - response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); - } catch (UnsupportedEncodingException e) { - log.error("", e); - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UnsupportedEncodingException " + e); - return response; - } - } else { - log.error("No subscription group in this broker, client: " + ctx.channel().remoteAddress()); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("No subscription group in this broker"); - return response; - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - - return response; - } - - private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - DeleteSubscriptionGroupRequestHeader requestHeader = - (DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class); - - log.info("deleteSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - - this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName()); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final GetTopicStatsInfoRequestHeader requestHeader = - (GetTopicStatsInfoRequestHeader) request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class); - - final String topic = requestHeader.getTopic(); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); - if (null == topicConfig) { - response.setCode(ResponseCode.TOPIC_NOT_EXIST); - response.setRemark("topic[" + topic + "] not exist"); - return response; - } - - TopicStatsTable topicStatsTable = new TopicStatsTable(); - for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { - MessageQueue mq = new MessageQueue(); - mq.setTopic(topic); - mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - mq.setQueueId(i); - - TopicOffset topicOffset = new TopicOffset(); - long min = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, i); - if (min < 0) - min = 0; - - long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); - if (max < 0) - max = 0; - - long timestamp = 0; - if (max > 0) { - timestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1); - } - - topicOffset.setMinOffset(min); - topicOffset.setMaxOffset(max); - topicOffset.setLastUpdateTimestamp(timestamp); - - topicStatsTable.getOffsetTable().put(mq, topicOffset); - } - - byte[] body = topicStatsTable.encode(); - response.setBody(body); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final GetConsumerConnectionListRequestHeader requestHeader = - (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); - - ConsumerGroupInfo consumerGroupInfo = - this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); - if (consumerGroupInfo != null) { - ConsumerConnection bodydata = new ConsumerConnection(); - bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere()); - bodydata.setConsumeType(consumerGroupInfo.getConsumeType()); - bodydata.setMessageModel(consumerGroupInfo.getMessageModel()); - bodydata.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable()); - - Iterator<Map.Entry<Channel, ClientChannelInfo>> it = consumerGroupInfo.getChannelInfoTable().entrySet().iterator(); - while (it.hasNext()) { - ClientChannelInfo info = it.next().getValue(); - Connection connection = new Connection(); - connection.setClientId(info.getClientId()); - connection.setLanguage(info.getLanguage()); - connection.setVersion(info.getVersion()); - connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel())); - - bodydata.getConnectionSet().add(connection); - } - - byte[] body = bodydata.encode(); - response.setBody(body); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - - return response; - } - - response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); - response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not online"); - return response; - } - - private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final GetProducerConnectionListRequestHeader requestHeader = - (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); - - ProducerConnection bodydata = new ProducerConnection(); - HashMap<Channel, ClientChannelInfo> channelInfoHashMap = - this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup()); - if (channelInfoHashMap != null) { - Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator(); - while (it.hasNext()) { - ClientChannelInfo info = it.next().getValue(); - Connection connection = new Connection(); - connection.setClientId(info.getClientId()); - connection.setLanguage(info.getLanguage()); - connection.setVersion(info.getVersion()); - connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel())); - - bodydata.getConnectionSet().add(connection); - } - - byte[] body = bodydata.encode(); - response.setBody(body); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("the producer group[" + requestHeader.getProducerGroup() + "] not exist"); - return response; - } - - private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final GetConsumeStatsRequestHeader requestHeader = - (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); - - ConsumeStats consumeStats = new ConsumeStats(); - - Set<String> topics = new HashSet<String>(); - if (UtilAll.isBlank(requestHeader.getTopic())) { - topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup()); - } else { - topics.add(requestHeader.getTopic()); - } - - for (String topic : topics) { - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); - if (null == topicConfig) { - log.warn("consumeStats, topic config not exist, {}", topic); - continue; - } - - /** - - */ - { - SubscriptionData findSubscriptionData = - this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); - - if (null == findSubscriptionData // - && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { - log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic); - continue; - } - } - - for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { - MessageQueue mq = new MessageQueue(); - mq.setTopic(topic); - mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - mq.setQueueId(i); - - OffsetWrapper offsetWrapper = new OffsetWrapper(); - - long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); - if (brokerOffset < 0) - brokerOffset = 0; - - long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// - requestHeader.getConsumerGroup(), // - topic, // - i); - if (consumerOffset < 0) - consumerOffset = 0; - - offsetWrapper.setBrokerOffset(brokerOffset); - offsetWrapper.setConsumerOffset(consumerOffset); - - - long timeOffset = consumerOffset - 1; - if (timeOffset >= 0) { - long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); - if (lastTimestamp > 0) { - offsetWrapper.setLastTimestamp(lastTimestamp); - } - } - - consumeStats.getOffsetTable().put(mq, offsetWrapper); - } - - double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic); - - consumeTps += consumeStats.getConsumeTps(); - consumeStats.setConsumeTps(consumeTps); - } - - byte[] body = consumeStats.encode(); - response.setBody(body); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - - String content = this.brokerController.getConsumerOffsetManager().encode(); - if (content != null && content.length() > 0) { - try { - response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); - } catch (UnsupportedEncodingException e) { - log.error("get all consumer offset from master error.", e); - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UnsupportedEncodingException " + e); - return response; - } - } else { - log.error("No consumer offset in this broker, client: " + ctx.channel().remoteAddress()); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("No consumer offset in this broker"); - return response; - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - - return response; - } - - private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - - String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); - if (content != null && content.length() > 0) { - try { - response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); - } catch (UnsupportedEncodingException e) { - log.error("get all delay offset from master error.", e); - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("UnsupportedEncodingException " + e); - return response; - } - } else { - log.error("No delay offset in this broker, client: " + ctx.channel().remoteAddress()); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("No delay offset in this broker"); - return response; - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - - return response; - } - - public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final ResetOffsetRequestHeader requestHeader = - (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); - log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", - new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp(), requestHeader.isForce()}); - boolean isC = false; - LanguageCode language = request.getLanguage(); - switch (language) { - case CPP: - isC = true; - break; - } - return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp(), requestHeader.isForce(), isC); - } - - public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final GetConsumerStatusRequestHeader requestHeader = - (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); - - log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}", - new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()}); - - return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getClientAddr()); - } - - private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - QueryTopicConsumeByWhoRequestHeader requestHeader = - (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); - - - HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic()); - - Set<String> groupInOffset = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(requestHeader.getTopic()); - if (groupInOffset != null && !groupInOffset.isEmpty()) { - groups.addAll(groupInOffset); - } - - GroupList groupList = new GroupList(); - groupList.setGroupList(groups); - byte[] body = groupList.encode(); - - response.setBody(body); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class); - final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader(); - final RegisterFilterServerRequestHeader requestHeader = - (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class); - - this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr()); - - responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId()); - responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - QueryConsumeTimeSpanRequestHeader requestHeader = - (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class); - - final String topic = requestHeader.getTopic(); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); - if (null == topicConfig) { - response.setCode(ResponseCode.TOPIC_NOT_EXIST); - response.setRemark("topic[" + topic + "] not exist"); - return response; - } - - List<QueueTimeSpan> timeSpanSet = new ArrayList<QueueTimeSpan>(); - for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { - QueueTimeSpan timeSpan = new QueueTimeSpan(); - MessageQueue mq = new MessageQueue(); - mq.setTopic(topic); - mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - mq.setQueueId(i); - timeSpan.setMessageQueue(mq); - - long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i); - timeSpan.setMinTimeStamp(minTime); - - long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); - long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1); - timeSpan.setMaxTimeStamp(maxTime); - - long consumeTime; - long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( - requestHeader.getGroup(), topic, i); - if (consumerOffset > 0) { - consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1); - } else { - consumeTime = minTime; - } - timeSpan.setConsumeTimeStamp(consumeTime); - - long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), i); - if (consumerOffset < maxBrokerOffset) { - long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset); - timeSpan.setDelayTime(System.currentTimeMillis() - nextTime); - } - timeSpanSet.add(timeSpan); - } - - QueryConsumeTimeSpanBody queryConsumeTimeSpanBody = new QueryConsumeTimeSpanBody(); - queryConsumeTimeSpanBody.setConsumeTimeSpanSet(timeSpanSet); - response.setBody(queryConsumeTimeSpanBody.encode()); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - - Set<String> topics = this.brokerController.getTopicConfigManager().getSystemTopic(); - TopicList topicList = new TopicList(); - topicList.setTopicList(topics); - response.setBody(topicList.encode()); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - public RemotingCommand cleanExpiredConsumeQueue() { - log.warn("invoke cleanExpiredConsumeQueue start."); - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - brokerController.getMessageStore().cleanExpiredConsumerQueue(); - log.warn("invoke cleanExpiredConsumeQueue end."); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - public RemotingCommand cleanUnusedTopic() { - log.warn("invoke cleanUnusedTopic start."); - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - brokerController.getMessageStore().cleanUnusedTopic(brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); - log.warn("invoke cleanUnusedTopic end."); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - /** - - */ - private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final GetConsumerRunningInfoRequestHeader requestHeader = - (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); - - return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(), - requestHeader.getClientId()); - } - - private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - QueryCorrectionOffsetHeader requestHeader = - (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class); - - Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager() - .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups()); - - Map<Integer, Long> compareOffset = - this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup()); - - if (compareOffset != null && !compareOffset.isEmpty()) { - for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) { - Integer queueId = entry.getKey(); - correctionOffset.put(queueId, - correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId)); - } - } - - QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody(); - body.setCorrectionOffsets(correctionOffset); - response.setBody(body.encode()); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request - .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); - - request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName()); - SelectMappedBufferResult selectMappedBufferResult = null; - try { - MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId()); - selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset()); - - byte[] body = new byte[selectMappedBufferResult.getSize()]; - selectMappedBufferResult.getByteBuffer().get(body); - request.setBody(body); - } catch (UnknownHostException e) { - } finally { - if (selectMappedBufferResult != null) { - selectMappedBufferResult.release(); - } - } - - return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(), - requestHeader.getClientId()); - } - - private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - CloneGroupOffsetRequestHeader requestHeader = - (CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class); - - Set<String> topics; - if (UtilAll.isBlank(requestHeader.getTopic())) { - topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup()); - } else { - topics = new HashSet<String>(); - topics.add(requestHeader.getTopic()); - } - - for (String topic : topics) { - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); - if (null == topicConfig) { - log.warn("[cloneGroupOffset], topic config not exist, {}", topic); - continue; - } - - /** - - */ - if (!requestHeader.isOffline()) { - - SubscriptionData findSubscriptionData = - this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic); - if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0 - && findSubscriptionData == null) { - log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic); - continue; - } - } - - this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(), - requestHeader.getTopic()); - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final ViewBrokerStatsDataRequestHeader requestHeader = - (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); - - StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); - if (null == statsItem) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("The stats <%s> <%s> not exist", requestHeader.getStatsName(), requestHeader.getStatsKey())); - return response; - } - - BrokerStatsData brokerStatsData = new BrokerStatsData(); - - { - BrokerStatsItem it = new BrokerStatsItem(); - StatsSnapshot ss = statsItem.getStatsDataInMinute(); - it.setSum(ss.getSum()); - it.setTps(ss.getTps()); - it.setAvgpt(ss.getAvgpt()); - brokerStatsData.setStatsMinute(it); - } - - - { - BrokerStatsItem it = new BrokerStatsItem(); - StatsSnapshot ss = statsItem.getStatsDataInHour(); - it.setSum(ss.getSum()); - it.setTps(ss.getTps()); - it.setAvgpt(ss.getAvgpt()); - brokerStatsData.setStatsHour(it); - } - - - { - BrokerStatsItem it = new BrokerStatsItem(); - StatsSnapshot ss = statsItem.getStatsDataInDay(); - it.setSum(ss.getSum()); - it.setTps(ss.getTps()); - it.setAvgpt(ss.getAvgpt()); - brokerStatsData.setStatsDay(it); - } - - response.setBody(brokerStatsData.encode()); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - GetConsumeStatsInBrokerHeader requestHeader = - (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); - boolean isOrder = requestHeader.isOrder(); - ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups = - brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); - - List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList = - new ArrayList<Map<String, List<ConsumeStats>>>(); - - long totalDiff = 0L; - - for (String group : subscriptionGroups.keySet()) { - Map<String, List<ConsumeStats>> subscripTopicConsumeMap = new HashMap<String, List<ConsumeStats>>(); - Set<String> topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group); - List<ConsumeStats> consumeStatsList = new ArrayList<ConsumeStats>(); - for (String topic : topics) { - ConsumeStats consumeStats = new ConsumeStats(); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); - if (null == topicConfig) { - log.warn("consumeStats, topic config not exist, {}", topic); - continue; - } - - if (isOrder && !topicConfig.isOrder()) { - continue; - } - /** - - */ - { - SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic); - - if (null == findSubscriptionData // - && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) { - log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic); - continue; - } - } - - for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { - MessageQueue mq = new MessageQueue(); - mq.setTopic(topic); - mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - mq.setQueueId(i); - OffsetWrapper offsetWrapper = new OffsetWrapper(); - long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); - if (brokerOffset < 0) - brokerOffset = 0; - long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// - group, // - topic, // - i); - if (consumerOffset < 0) - consumerOffset = 0; - - offsetWrapper.setBrokerOffset(brokerOffset); - offsetWrapper.setConsumerOffset(consumerOffset); - - - long timeOffset = consumerOffset - 1; - if (timeOffset >= 0) { - long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); - if (lastTimestamp > 0) { - offsetWrapper.setLastTimestamp(lastTimestamp); - } - } - consumeStats.getOffsetTable().put(mq, offsetWrapper); - } - double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(group, topic); - consumeTps += consumeStats.getConsumeTps(); - consumeStats.setConsumeTps(consumeTps); - totalDiff += consumeStats.computeTotalDiff(); - consumeStatsList.add(consumeStats); - } - subscripTopicConsumeMap.put(group, consumeStatsList); - brokerConsumeStatsList.add(subscripTopicConsumeMap); - } - ConsumeStatsList consumeStats = new ConsumeStatsList(); - consumeStats.setBrokerAddr(brokerController.getBrokerAddr()); - consumeStats.setConsumeStatsList(brokerConsumeStatsList); - consumeStats.setTotalDiff(totalDiff); - response.setBody(consumeStats.encode()); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - private HashMap<String, String> prepareRuntimeInfo() { - HashMap<String, String> runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo(); - runtimeInfo.put("brokerVersionDesc", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); - runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CURRENT_VERSION)); - - runtimeInfo.put("msgPutTotalYesterdayMorning", - String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning())); - runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning())); - runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow())); - - runtimeInfo.put("msgGetTotalYesterdayMorning", - String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning())); - runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning())); - runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow())); - - runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size())); - - runtimeInfo.put("sendThreadPoolQueueCapacity", - String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity())); - - runtimeInfo.put("pullThreadPoolQueueSize", String.valueOf(this.brokerController.getPullThreadPoolQueue().size())); - runtimeInfo.put("pullThreadPoolQueueCapacity", - String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity())); - - runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); - runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); - - runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue())); - runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue())); - runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime())); - runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp())); - if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) { - DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); - runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs())); - if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { - runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false)); - } - runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToFlush(), false)); - } - - java.io.File commitLogDir = new java.io.File(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); - if (commitLogDir.exists()) { - runtimeInfo.put("commitLogDirCapacity", String.format("Total : %s, Free : %s.", MixAll.humanReadableByteCount(commitLogDir.getTotalSpace(), false), MixAll.humanReadableByteCount(commitLogDir.getFreeSpace(), false))); - } - - return runtimeInfo; - } - - private RemotingCommand callConsumer(// - final int requestCode, // - final RemotingCommand request, // - final String consumerGroup, // - final String clientId) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId); - - if (null == clientChannelInfo) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId)); - return response; - } - - if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", // - clientId, // - MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); - return response; - } - - try { - RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null); - newRequest.setExtFields(request.getExtFields()); - newRequest.setBody(request.getBody()); - - RemotingCommand consumerResponse = - this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); - return consumerResponse; - } catch (RemotingTimeoutException e) { - response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); - response - .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); - return response; - } catch (Exception e) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark( - String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); - return response; - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ClientManageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ClientManageProcessor.java deleted file mode 100644 index 254e63c..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ClientManageProcessor.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.processor; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.client.ClientChannelInfo; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.constant.PermName; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.UnregisterClientRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.UnregisterClientResponseHeader; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumerData; -import com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData; -import com.alibaba.rocketmq.common.protocol.heartbeat.ProducerData; -import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; -import com.alibaba.rocketmq.common.sysflag.TopicSysFlag; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * @author shijia.wxr - */ -public class ClientManageProcessor implements NettyRequestProcessor { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final BrokerController brokerController; - - public ClientManageProcessor(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - switch (request.getCode()) { - case RequestCode.HEART_BEAT: - return this.heartBeat(ctx, request); - case RequestCode.UNREGISTER_CLIENT: - return this.unregisterClient(ctx, request); - default: - break; - } - return null; - } - - @Override - public boolean rejectRequest() { - return false; - } - - public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { - RemotingCommand response = RemotingCommand.createResponseCommand(null); - HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); - ClientChannelInfo clientChannelInfo = new ClientChannelInfo( - ctx.channel(), - heartbeatData.getClientID(), - request.getLanguage(), - request.getVersion() - ); - - for (ConsumerData data : heartbeatData.getConsumerDataSet()) { - SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( - data.getGroupName()); - boolean isNotifyConsumerIdsChangedEnable = true; - if (null != subscriptionGroupConfig) { - isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); - int topicSysFlag = 0; - if (data.isUnitMode()) { - topicSysFlag = TopicSysFlag.buildSysFlag(false, true); - } - String newTopic = MixAll.getRetryTopic(data.getGroupName()); - this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( - newTopic, - subscriptionGroupConfig.getRetryQueueNums(), - PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); - } - - boolean changed = this.brokerController.getConsumerManager().registerConsumer( - data.getGroupName(), - clientChannelInfo, - data.getConsumeType(), - data.getMessageModel(), - data.getConsumeFromWhere(), - data.getSubscriptionDataSet(), - isNotifyConsumerIdsChangedEnable - ); - - if (changed) { - log.info("registerConsumer info changed {} {}", - data.toString(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel()) - ); - } - } - - for (ProducerData data : heartbeatData.getProducerDataSet()) { - this.brokerController.getProducerManager().registerProducer(data.getGroupName(), - clientChannelInfo); - } - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = - RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class); - final UnregisterClientRequestHeader requestHeader = - (UnregisterClientRequestHeader) request - .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); - - ClientChannelInfo clientChannelInfo = new ClientChannelInfo( - ctx.channel(), - requestHeader.getClientID(), - request.getLanguage(), - request.getVersion()); - { - final String group = requestHeader.getProducerGroup(); - if (group != null) { - this.brokerController.getProducerManager().unregisterProducer(group, clientChannelInfo); - } - } - - { - final String group = requestHeader.getConsumerGroup(); - if (group != null) { - SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group); - boolean isNotifyConsumerIdsChangedEnable = true; - if (null != subscriptionGroupConfig) { - isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); - } - this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable); - } - } - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ConsumerManageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ConsumerManageProcessor.java deleted file mode 100644 index d7f9198..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ConsumerManageProcessor.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.processor; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.*; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class ConsumerManageProcessor implements NettyRequestProcessor { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - - private final BrokerController brokerController; - - - public ConsumerManageProcessor(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - switch (request.getCode()) { - case RequestCode.GET_CONSUMER_LIST_BY_GROUP: - return this.getConsumerListByGroup(ctx, request); - case RequestCode.UPDATE_CONSUMER_OFFSET: - return this.updateConsumerOffset(ctx, request); - case RequestCode.QUERY_CONSUMER_OFFSET: - return this.queryConsumerOffset(ctx, request); - default: - break; - } - return null; - } - - @Override - public boolean rejectRequest() { - return false; - } - - - public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = - RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); - final GetConsumerListByGroupRequestHeader requestHeader = - (GetConsumerListByGroupRequestHeader) request - .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); - - ConsumerGroupInfo consumerGroupInfo = - this.brokerController.getConsumerManager().getConsumerGroupInfo( - requestHeader.getConsumerGroup()); - if (consumerGroupInfo != null) { - List<String> clientIds = consumerGroupInfo.getAllClientId(); - if (!clientIds.isEmpty()) { - GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody(); - body.setConsumerIdList(clientIds); - response.setBody(body.encode()); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } else { - log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - } - } else { - log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - } - - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup()); - return response; - } - - private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = - RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class); - final UpdateConsumerOffsetRequestHeader requestHeader = - (UpdateConsumerOffsetRequestHeader) request - .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); - this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), - requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - - - private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { - final RemotingCommand response = - RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); - final QueryConsumerOffsetResponseHeader responseHeader = - (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); - final QueryConsumerOffsetRequestHeader requestHeader = - (QueryConsumerOffsetRequestHeader) request - .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); - - long offset = - this.brokerController.getConsumerOffsetManager().queryOffset( - requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); - - - if (offset >= 0) { - responseHeader.setOffset(offset); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - } else { - long minOffset = - this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), - requestHeader.getQueueId()); - if (minOffset <= 0 - && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( - requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { - responseHeader.setOffset(0L); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - } else { - response.setCode(ResponseCode.QUERY_NOT_FOUND); - response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"); - } - } - - return response; - } -}
