http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java index 146770a..2971b6c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java @@ -6,38 +6,34 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.pagecache; -import org.apache.rocketmq.store.QueryMessageResult; import io.netty.channel.FileRegion; import io.netty.util.AbstractReferenceCounted; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.List; - +import org.apache.rocketmq.store.QueryMessageResult; public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion { private final ByteBuffer byteBufferHeader; private final QueryMessageResult queryMessageResult; private long transfered; // the bytes which was transfered already - public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) { this.byteBufferHeader = byteBufferHeader; this.queryMessageResult = queryMessageResult; } - @Override public long position() { int pos = byteBufferHeader.position();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 601e2f3..bac941d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ /** @@ -20,12 +20,16 @@ */ package org.apache.rocketmq.broker.plugin; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.store.*; - import java.util.HashMap; import java.util.Set; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.QueryMessageResult; +import org.apache.rocketmq.store.SelectMappedBufferResult; public abstract class AbstractPluginMessageStore implements MessageStore { protected MessageStore next = null; @@ -83,7 +87,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore { @Override public GetMessageResult getMessage(String group, String topic, int queueId, long offset, - int maxMsgNums, SubscriptionData subscriptionData) { + int maxMsgNums, SubscriptionData subscriptionData) { return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData); } @@ -174,7 +178,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore { @Override public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, - long end) { + long end) { return next.queryMessage(topic, key, maxNum, begin, end); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java index 42793ae..294bf8c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ /** @@ -20,14 +20,13 @@ */ package org.apache.rocketmq.broker.plugin; -import org.apache.rocketmq.store.MessageStore; - import java.io.IOException; import java.lang.reflect.Constructor; +import org.apache.rocketmq.store.MessageStore; public final class MessageStoreFactory { public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore) - throws IOException { + throws IOException { String plugin = context.getBrokerConfig().getMessageStorePlugIn(); if (plugin != null && plugin.trim().length() != 0) { String[] pluginClasses = plugin.split(","); @@ -35,12 +34,12 @@ public final class MessageStoreFactory { String pluginClass = pluginClasses[i]; try { @SuppressWarnings("unchecked") - Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass); + Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>)Class.forName(pluginClass); Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class); messageStore = construct.newInstance(context, messageStore); } catch (Throwable e) { throw new RuntimeException(String.format( - "Initialize plugin's class %s not found!", pluginClass), e); + "Initialize plugin's class %s not found!", pluginClass), e); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java index 32af402..fcab1e6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ /** @@ -32,8 +32,8 @@ public class MessageStorePluginContext { private BrokerConfig brokerConfig; public MessageStorePluginContext(MessageStoreConfig messageStoreConfig, - BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, - BrokerConfig brokerConfig) { + BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, + BrokerConfig brokerConfig) { super(); this.messageStoreConfig = messageStoreConfig; this.brokerStatsManager = brokerStatsManager; @@ -57,5 +57,4 @@ public class MessageStorePluginContext { return brokerConfig; } - } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index f04e86c..75e5766 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -6,16 +6,22 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.ChannelHandlerContext; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; @@ -42,17 +48,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; -import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; -import java.util.Map; -import java.util.Random; - - public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -62,16 +60,15 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces protected final SocketAddress storeHost; private List<SendMessageHook> sendMessageHookList; - public AbstractSendMessageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; this.storeHost = - new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController - .getNettyServerConfig().getListenPort()); + new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController + .getNettyServerConfig().getListenPort()); } protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx, - SendMessageRequestHeader requestHeader) { + SendMessageRequestHeader requestHeader) { if (!this.hasSendMessageHook()) { return null; } @@ -91,7 +88,6 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); requestHeader.setProperties(MessageDecoder.messageProperties2String(properties)); - if (uniqueKey == null) { uniqueKey = ""; } @@ -104,7 +100,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx, - final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) { + final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) { int queueIdInt = requestHeader.getQueueId(); if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); @@ -120,10 +116,10 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, - MessageDecoder.string2messageProperties(requestHeader.getProperties())); + MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), - msgInner.getTags())); + msgInner.getTags())); msgInner.setQueueId(queueIdInt); msgInner.setSysFlag(sysFlag); @@ -131,7 +127,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader - .getReconsumeTimes()); + .getReconsumeTimes()); return msgInner; } @@ -140,8 +136,8 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx, - final SendMessageRequestHeader requestHeader, RemotingCommand request, - final RemotingCommand response) { + final SendMessageRequestHeader requestHeader, RemotingCommand request, + final RemotingCommand response) { if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + requestHeader.getTopic().length()); response.setCode(ResponseCode.MESSAGE_ILLEGAL); @@ -149,13 +145,13 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " - + requestHeader.getProperties().length()); + + requestHeader.getProperties().length()); response.setCode(ResponseCode.MESSAGE_ILLEGAL); return response; } if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) { log.warn(" topic {} msg body size {} from {}", requestHeader.getTopic(), - request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel())); + request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel())); response.setRemark("msg body must be less 64KB"); response.setCode(ResponseCode.MESSAGE_ILLEGAL); return response; @@ -164,12 +160,12 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, - final SendMessageRequestHeader requestHeader, final RemotingCommand response) { + final SendMessageRequestHeader requestHeader, final RemotingCommand response) { if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) - && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { + && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() - + "] sending message is forbidden"); + + "] sending message is forbidden"); return response; } if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { @@ -181,7 +177,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } TopicConfig topicConfig = - this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { int topicSysFlag = 0; if (requestHeader.isUnitMode()) { @@ -193,26 +189,26 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: " - + ctx.channel().remoteAddress()); + + ctx.channel().remoteAddress()); topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(// - requestHeader.getTopic(), // - requestHeader.getDefaultTopic(), // - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), // - requestHeader.getDefaultTopicQueueNums(), topicSysFlag); + requestHeader.getTopic(), // + requestHeader.getDefaultTopic(), // + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), // + requestHeader.getDefaultTopicQueueNums(), topicSysFlag); if (null == topicConfig) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { topicConfig = - this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( - requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, - topicSysFlag); + this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( + requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, + topicSysFlag); } } if (null == topicConfig) { response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" - + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } } @@ -221,9 +217,9 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); if (queueIdInt >= idValid) { String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s", - queueIdInt, - topicConfig.toString(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + queueIdInt, + topicConfig.toString(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); @@ -239,7 +235,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request, - final RemotingCommand response) { + final RemotingCommand response) { if (!request.isOnewayRPC()) { try { ctx.writeAndFlush(response); @@ -252,7 +248,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request, - SendMessageContext context) { + SendMessageContext context) { if (hasSendMessageHook()) { for (SendMessageHook hook : this.sendMessageHookList) { try { @@ -280,20 +276,20 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { SendMessageRequestHeaderV2 requestHeaderV2 = null; SendMessageRequestHeader requestHeader = null; switch (request.getCode()) { case RequestCode.SEND_MESSAGE_V2: requestHeaderV2 = - (SendMessageRequestHeaderV2) request - .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + (SendMessageRequestHeaderV2)request + .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); case RequestCode.SEND_MESSAGE: if (null == requestHeaderV2) { requestHeader = - (SendMessageRequestHeader) request - .decodeCommandCustomHeader(SendMessageRequestHeader.class); + (SendMessageRequestHeader)request + .decodeCommandCustomHeader(SendMessageRequestHeader.class); } else { requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); } @@ -309,7 +305,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces try { if (response != null) { final SendMessageResponseHeader responseHeader = - (SendMessageResponseHeader) response.readCustomHeader(); + (SendMessageResponseHeader)response.readCustomHeader(); context.setMsgId(responseHeader.getMsgId()); context.setQueueId(responseHeader.getQueueId()); context.setQueueOffset(responseHeader.getQueueOffset()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index d2d4bc7..722bec2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -16,6 +16,19 @@ */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.io.UnsupportedEncodingException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -33,8 +46,48 @@ import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.*; -import org.apache.rocketmq.common.protocol.header.*; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.BrokerStatsItem; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; +import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody; +import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; +import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; +import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; +import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; +import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; @@ -50,17 +103,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.UnsupportedEncodingException; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - - public class AdminBrokerProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -157,10 +202,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final CreateTopicRequestHeader requestHeader = - (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); + (CreateTopicRequestHeader)request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) { String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; log.warn(errorMsg); @@ -193,13 +237,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteTopicRequestHeader requestHeader = - (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class); + (DeleteTopicRequestHeader)request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class); log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); this.brokerController.getMessageStore() - .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); + .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -274,7 +318,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class); - final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response.readCustomHeader(); + final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader)response.readCustomHeader(); String content = this.brokerController.getConfiguration().getAllConfigsFormatString(); if (content != null && content.length() > 0) { @@ -298,12 +342,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class); - final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader(); + final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader)response.readCustomHeader(); final SearchOffsetRequestHeader requestHeader = - (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class); + (SearchOffsetRequestHeader)request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class); long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), - requestHeader.getTimestamp()); + requestHeader.getTimestamp()); responseHeader.setOffset(offset); @@ -314,9 +358,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); - final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); + final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader)response.readCustomHeader(); final GetMaxOffsetRequestHeader requestHeader = - (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); + (GetMaxOffsetRequestHeader)request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); @@ -329,9 +373,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); - final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); + final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader)response.readCustomHeader(); final GetMinOffsetRequestHeader requestHeader = - (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); + (GetMinOffsetRequestHeader)request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); @@ -343,12 +387,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class); - final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader(); + final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader)response.readCustomHeader(); final GetEarliestMsgStoretimeRequestHeader requestHeader = - (GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class); + (GetEarliestMsgStoretimeRequestHeader)request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class); long timestamp = - this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId()); + this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setTimestamp(timestamp); response.setCode(ResponseCode.SUCCESS); @@ -375,9 +419,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(// - requestBody.getConsumerGroup(), // - requestBody.getMqSet(), // - requestBody.getClientId()); + requestBody.getConsumerGroup(), // + requestBody.getMqSet(), // + requestBody.getClientId()); LockBatchResponseBody responseBody = new LockBatchResponseBody(); responseBody.setLockOKMQSet(lockOKMQSet); @@ -393,9 +437,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); this.brokerController.getRebalanceLockManager().unlockBatch(// - requestBody.getConsumerGroup(), // - requestBody.getMqSet(), // - requestBody.getClientId()); + requestBody.getConsumerGroup(), // + requestBody.getMqSet(), // + requestBody.getClientId()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -403,7 +447,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); log.info("updateAndCreateSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); @@ -447,7 +491,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteSubscriptionGroupRequestHeader requestHeader = - (DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class); + (DeleteSubscriptionGroupRequestHeader)request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class); log.info("deleteSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); @@ -461,7 +505,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetTopicStatsInfoRequestHeader requestHeader = - (GetTopicStatsInfoRequestHeader) request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class); + (GetTopicStatsInfoRequestHeader)request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class); final String topic = requestHeader.getTopic(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); @@ -509,10 +553,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumerConnectionListRequestHeader requestHeader = - (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); + (GetConsumerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); ConsumerGroupInfo consumerGroupInfo = - this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); + this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); if (consumerGroupInfo != null) { ConsumerConnection bodydata = new ConsumerConnection(); bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere()); @@ -548,11 +592,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetProducerConnectionListRequestHeader requestHeader = - (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); + (GetProducerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); ProducerConnection bodydata = new ProducerConnection(); HashMap<Channel, ClientChannelInfo> channelInfoHashMap = - this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup()); + this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup()); if (channelInfoHashMap != null) { Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator(); while (it.hasNext()) { @@ -581,7 +625,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumeStatsRequestHeader requestHeader = - (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); + (GetConsumeStatsRequestHeader)request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); ConsumeStats consumeStats = new ConsumeStats(); @@ -604,10 +648,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { */ { SubscriptionData findSubscriptionData = - this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); + this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); if (null == findSubscriptionData // - && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { + && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic); continue; } @@ -626,16 +670,15 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { brokerOffset = 0; long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// - requestHeader.getConsumerGroup(), // - topic, // - i); + requestHeader.getConsumerGroup(), // + topic, // + i); if (consumerOffset < 0) consumerOffset = 0; offsetWrapper.setBrokerOffset(brokerOffset); offsetWrapper.setConsumerOffset(consumerOffset); - long timeOffset = consumerOffset - 1; if (timeOffset >= 0) { long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); @@ -690,7 +733,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); + String content = ((DefaultMessageStore)this.brokerController.getMessageStore()).getScheduleMessageService().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); @@ -716,10 +759,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final ResetOffsetRequestHeader requestHeader = - (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); + (ResetOffsetRequestHeader)request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp(), requestHeader.isForce()); + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp(), requestHeader.isForce()); boolean isC = false; LanguageCode language = request.getLanguage(); switch (language) { @@ -728,25 +771,24 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { break; } return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp(), requestHeader.isForce(), isC); + requestHeader.getTimestamp(), requestHeader.isForce(), isC); } public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final GetConsumerStatusRequestHeader requestHeader = - (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); + (GetConsumerStatusRequestHeader)request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()); + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()); return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getClientAddr()); + requestHeader.getClientAddr()); } private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryTopicConsumeByWhoRequestHeader requestHeader = - (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); - + (QueryTopicConsumeByWhoRequestHeader)request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic()); @@ -767,9 +809,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class); - final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader(); + final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader)response.readCustomHeader(); final RegisterFilterServerRequestHeader requestHeader = - (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class); + (RegisterFilterServerRequestHeader)request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class); this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr()); @@ -784,7 +826,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryConsumeTimeSpanRequestHeader requestHeader = - (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class); + (QueryConsumeTimeSpanRequestHeader)request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class); final String topic = requestHeader.getTopic(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); @@ -812,7 +854,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { long consumeTime; long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( - requestHeader.getGroup(), topic, i); + requestHeader.getGroup(), topic, i); if (consumerOffset > 0) { consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1); } else { @@ -837,7 +879,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); Set<String> topics = this.brokerController.getTopicConfigManager().getSystemTopic(); @@ -874,28 +916,28 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { */ private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final GetConsumerRunningInfoRequestHeader requestHeader = - (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); + (GetConsumerRunningInfoRequestHeader)request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(), - requestHeader.getClientId()); + requestHeader.getClientId()); } private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryCorrectionOffsetHeader requestHeader = - (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class); + (QueryCorrectionOffsetHeader)request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class); Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager() - .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups()); + .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups()); Map<Integer, Long> compareOffset = - this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup()); + this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup()); if (compareOffset != null && !compareOffset.isEmpty()) { for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) { Integer queueId = entry.getKey(); correctionOffset.put(queueId, - correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId)); + correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId)); } } @@ -908,8 +950,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request - .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); + final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader)request + .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName()); SelectMappedBufferResult selectMappedBufferResult = null; @@ -928,13 +970,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(), - requestHeader.getClientId()); + requestHeader.getClientId()); } private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); CloneGroupOffsetRequestHeader requestHeader = - (CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class); + (CloneGroupOffsetRequestHeader)request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class); Set<String> topics; if (UtilAll.isBlank(requestHeader.getTopic())) { @@ -957,16 +999,16 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (!requestHeader.isOffline()) { SubscriptionData findSubscriptionData = - this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic); + this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic); if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0 - && findSubscriptionData == null) { + && findSubscriptionData == null) { log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic); continue; } } this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(), - requestHeader.getTopic()); + requestHeader.getTopic()); } response.setCode(ResponseCode.SUCCESS); @@ -976,9 +1018,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final ViewBrokerStatsDataRequestHeader requestHeader = - (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); + (ViewBrokerStatsDataRequestHeader)request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); + DefaultMessageStore messageStore = (DefaultMessageStore)this.brokerController.getMessageStore(); StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); if (null == statsItem) { @@ -998,7 +1040,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { brokerStatsData.setStatsMinute(it); } - { BrokerStatsItem it = new BrokerStatsItem(); StatsSnapshot ss = statsItem.getStatsDataInHour(); @@ -1008,7 +1049,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { brokerStatsData.setStatsHour(it); } - { BrokerStatsItem it = new BrokerStatsItem(); StatsSnapshot ss = statsItem.getStatsDataInDay(); @@ -1025,16 +1065,16 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); GetConsumeStatsInBrokerHeader requestHeader = - (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); + (GetConsumeStatsInBrokerHeader)request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); boolean isOrder = requestHeader.isOrder(); ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups = - brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); + brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList = - new ArrayList<Map<String, List<ConsumeStats>>>(); + new ArrayList<Map<String, List<ConsumeStats>>>(); long totalDiff = 0L; @@ -1060,7 +1100,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic); if (null == findSubscriptionData // - && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) { + && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) { log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic); continue; } @@ -1076,16 +1116,15 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (brokerOffset < 0) brokerOffset = 0; long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// - group, // - topic, // - i); + group, // + topic, // + i); if (consumerOffset < 0) consumerOffset = 0; offsetWrapper.setBrokerOffset(brokerOffset); offsetWrapper.setConsumerOffset(consumerOffset); - long timeOffset = consumerOffset - 1; if (timeOffset >= 0) { long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); @@ -1120,23 +1159,23 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CURRENT_VERSION)); runtimeInfo.put("msgPutTotalYesterdayMorning", - String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning())); + String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning())); runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning())); runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow())); runtimeInfo.put("msgGetTotalYesterdayMorning", - String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning())); + String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning())); runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning())); runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow())); runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size())); runtimeInfo.put("sendThreadPoolQueueCapacity", - String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity())); + String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity())); runtimeInfo.put("pullThreadPoolQueueSize", String.valueOf(this.brokerController.getPullThreadPoolQueue().size())); runtimeInfo.put("pullThreadPoolQueueCapacity", - String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity())); + String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity())); runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); @@ -1146,7 +1185,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime())); runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp())); if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) { - DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); + DefaultMessageStore defaultMessageStore = (DefaultMessageStore)this.brokerController.getMessageStore(); runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs())); if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false)); @@ -1163,10 +1202,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } private RemotingCommand callConsumer(// - final int requestCode, // - final RemotingCommand request, // - final String consumerGroup, // - final String clientId) throws RemotingCommandException { + final int requestCode, // + final RemotingCommand request, // + final String consumerGroup, // + final String clientId) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId); @@ -1179,8 +1218,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", // - clientId, // - MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); + clientId, // + MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); return response; } @@ -1193,12 +1232,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } catch (RemotingTimeoutException e) { response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); response - .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); + .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); return response; } catch (Exception e) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark( - String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); + String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); return response; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java index 62de995..717afaf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.common.MixAll; @@ -34,11 +35,9 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ClientManageProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -49,7 +48,7 @@ public class ClientManageProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { switch (request.getCode()) { case RequestCode.HEART_BEAT: return this.heartBeat(ctx, request); @@ -70,16 +69,16 @@ public class ClientManageProcessor implements NettyRequestProcessor { RemotingCommand response = RemotingCommand.createResponseCommand(null); HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( - ctx.channel(), - heartbeatData.getClientID(), - request.getLanguage(), - request.getVersion() + ctx.channel(), + heartbeatData.getClientID(), + request.getLanguage(), + request.getVersion() ); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( - data.getGroupName()); + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( + data.getGroupName()); boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); @@ -89,32 +88,32 @@ public class ClientManageProcessor implements NettyRequestProcessor { } String newTopic = MixAll.getRetryTopic(data.getGroupName()); this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( - newTopic, - subscriptionGroupConfig.getRetryQueueNums(), - PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); + newTopic, + subscriptionGroupConfig.getRetryQueueNums(), + PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } boolean changed = this.brokerController.getConsumerManager().registerConsumer( - data.getGroupName(), - clientChannelInfo, - data.getConsumeType(), - data.getMessageModel(), - data.getConsumeFromWhere(), - data.getSubscriptionDataSet(), - isNotifyConsumerIdsChangedEnable + data.getGroupName(), + clientChannelInfo, + data.getConsumeType(), + data.getMessageModel(), + data.getConsumeFromWhere(), + data.getSubscriptionDataSet(), + isNotifyConsumerIdsChangedEnable ); if (changed) { log.info("registerConsumer info changed {} {}", - data.toString(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + data.toString(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()) ); } } for (ProducerData data : heartbeatData.getProducerDataSet()) { this.brokerController.getProducerManager().registerProducer(data.getGroupName(), - clientChannelInfo); + clientChannelInfo); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -122,18 +121,18 @@ public class ClientManageProcessor implements NettyRequestProcessor { } public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = - RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class); + RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class); final UnregisterClientRequestHeader requestHeader = - (UnregisterClientRequestHeader) request - .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); + (UnregisterClientRequestHeader)request + .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( - ctx.channel(), - requestHeader.getClientID(), - request.getLanguage(), - request.getVersion()); + ctx.channel(), + requestHeader.getClientID(), + request.getLanguage(), + request.getVersion()); { final String group = requestHeader.getProducerGroup(); if (group != null) { @@ -145,7 +144,7 @@ public class ClientManageProcessor implements NettyRequestProcessor { final String group = requestHeader.getConsumerGroup(); if (group != null) { SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group); + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group); boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index c0c43e0..d2e6d7d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -16,36 +16,39 @@ */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.ChannelHandlerContext; +import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; -import org.apache.rocketmq.common.protocol.header.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - - public class ConsumerManageProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; - public ConsumerManageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { switch (request.getCode()) { case RequestCode.GET_CONSUMER_LIST_BY_GROUP: return this.getConsumerListByGroup(ctx, request); @@ -64,18 +67,17 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { return false; } - public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = - RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); + RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); final GetConsumerListByGroupRequestHeader requestHeader = - (GetConsumerListByGroupRequestHeader) request - .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); + (GetConsumerListByGroupRequestHeader)request + .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); ConsumerGroupInfo consumerGroupInfo = - this.brokerController.getConsumerManager().getConsumerGroupInfo( - requestHeader.getConsumerGroup()); + this.brokerController.getConsumerManager().getConsumerGroupInfo( + requestHeader.getConsumerGroup()); if (consumerGroupInfo != null) { List<String> clientIds = consumerGroupInfo.getAllClientId(); if (!clientIds.isEmpty()) { @@ -87,11 +89,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { return response; } else { log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); } } else { log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); } response.setCode(ResponseCode.SYSTEM_ERROR); @@ -100,34 +102,32 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { } private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = - RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class); + RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class); final UpdateConsumerOffsetRequestHeader requestHeader = - (UpdateConsumerOffsetRequestHeader) request - .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); + (UpdateConsumerOffsetRequestHeader)request + .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), - requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); + requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } - private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = - RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); + RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); final QueryConsumerOffsetResponseHeader responseHeader = - (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); + (QueryConsumerOffsetResponseHeader)response.readCustomHeader(); final QueryConsumerOffsetRequestHeader requestHeader = - (QueryConsumerOffsetRequestHeader) request - .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); + (QueryConsumerOffsetRequestHeader)request + .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); long offset = - this.brokerController.getConsumerOffsetManager().queryOffset( - requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); - + this.brokerController.getConsumerOffsetManager().queryOffset( + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); if (offset >= 0) { responseHeader.setOffset(offset); @@ -135,11 +135,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor { response.setRemark(null); } else { long minOffset = - this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), - requestHeader.getQueueId()); + this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), + requestHeader.getQueueId()); if (minOffset <= 0 - && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( - requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { + && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( + requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null);
