http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index c493c70..6a34a69 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; @@ -33,11 +34,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; -import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class EndTransactionProcessor implements NettyRequestProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private final BrokerController brokerController; @@ -50,36 +49,35 @@ public class EndTransactionProcessor implements NettyRequestProcessor { public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final EndTransactionRequestHeader requestHeader = - (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); - + (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); if (requestHeader.getFromTransactionCheck()) { switch (requestHeader.getCommitOrRollback()) { case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn("check producer[{}] transaction state, but it's pending status." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); return null; } case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { LOGGER.warn("check producer[{}] transaction state, the producer commit the message." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); break; } case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn("check producer[{}] transaction state, the producer rollback the message." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); break; } default: @@ -89,10 +87,10 @@ public class EndTransactionProcessor implements NettyRequestProcessor { switch (requestHeader.getCommitOrRollback()) { case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); return null; } @@ -102,10 +100,10 @@ public class EndTransactionProcessor implements NettyRequestProcessor { case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message." - + "RequestHeader: {} Remark: {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.toString(), - request.getRemark()); + + "RequestHeader: {} Remark: {}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.toString(), + request.getRemark()); break; } default: @@ -210,8 +208,8 @@ public class EndTransactionProcessor implements NettyRequestProcessor { MessageAccessor.setProperties(msgInner, msgExt.getProperties()); TopicFilterType topicFilterType = - (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG - : TopicFilterType.SINGLE_TAG; + (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG + : TopicFilterType.SINGLE_TAG; long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); msgInner.setTagsCode(tagsCodeValue); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java index 67e55a4..2a6482c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java @@ -6,36 +6,33 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ForwardRequestProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; - public ForwardRequestProcessor(final BrokerController brokerController) { this.brokerController = brokerController; } - @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { return null; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 041037f..7169b9c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.FileRegion; +import java.nio.ByteBuffer; +import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.longpolling.PullRequest; @@ -49,14 +56,9 @@ import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import io.netty.channel.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.List; - - public class PullMessageProcessor implements NettyRequestProcessor { private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -77,12 +79,11 @@ public class PullMessageProcessor implements NettyRequestProcessor { } private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) - throws RemotingCommandException { + throws RemotingCommandException { RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); - final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); + final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader(); final PullMessageRequestHeader requestHeader = - (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); - + (PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class); response.setOpaque(request.getOpaque()); @@ -90,24 +91,21 @@ public class PullMessageProcessor implements NettyRequestProcessor { LOG.debug("receive PullMessage request command, " + request); } - if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden"); return response; } - SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " " - + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return response; } - if (!subscriptionGroupConfig.isConsumeEnable()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); @@ -120,49 +118,45 @@ public class PullMessageProcessor implements NettyRequestProcessor { final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0; - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel)); response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark( - "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } - if (!PermName.isReadable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden"); return response; } - if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic() - + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress(); + + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress(); LOG.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); return response; } - SubscriptionData subscriptionData = null; if (hasSubscriptionFlag) { try { subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - requestHeader.getSubscription()); + requestHeader.getSubscription()); } catch (Exception e) { LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // - requestHeader.getConsumerGroup()); + requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer's subscription failed"); return response; } } else { ConsumerGroupInfo consumerGroupInfo = - this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); + this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); if (null == consumerGroupInfo) { LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); @@ -171,7 +165,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { } if (!subscriptionGroupConfig.isConsumeBroadcastEnable() // - && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) { + && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way"); return response; @@ -185,10 +179,9 @@ public class PullMessageProcessor implements NettyRequestProcessor { return response; } - if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), - subscriptionData.getSubString()); + subscriptionData.getSubString()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); response.setRemark("the consumer's subscription not latest"); return response; @@ -196,15 +189,14 @@ public class PullMessageProcessor implements NettyRequestProcessor { } final GetMessageResult getMessageResult = - this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData); + this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData); if (getMessageResult != null) { response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); - if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { @@ -250,11 +242,11 @@ public class PullMessageProcessor implements NettyRequestProcessor { // XXX: warn and notify me LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", // - requestHeader.getQueueOffset(), // - getMessageResult.getNextBeginOffset(), // - requestHeader.getTopic(), // - requestHeader.getQueueId(), // - requestHeader.getConsumerGroup()// + requestHeader.getQueueOffset(), // + getMessageResult.getNextBeginOffset(), // + requestHeader.getTopic(), // + requestHeader.getQueueId(), // + requestHeader.getConsumerGroup()// ); } else { response.setCode(ResponseCode.PULL_NOT_FOUND); @@ -270,7 +262,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: " - + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress()); + + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress()); break; case OFFSET_OVERFLOW_ONE: response.setCode(ResponseCode.PULL_NOT_FOUND); @@ -278,8 +270,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", - requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), - getMessageResult.getMinOffset(), channel.remoteAddress()); + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), + getMessageResult.getMinOffset(), channel.remoteAddress()); break; default: assert false; @@ -308,7 +300,6 @@ public class PullMessageProcessor implements NettyRequestProcessor { case ResponseCode.PULL_NOT_FOUND: if (!brokerAllowSuspend) { - context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS); context.setCommercialRcvTimes(1); context.setCommercialOwner(owner); @@ -333,23 +324,23 @@ public class PullMessageProcessor implements NettyRequestProcessor { case ResponseCode.SUCCESS: this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - getMessageResult.getMessageCount()); + getMessageResult.getMessageCount()); this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(), - getMessageResult.getBufferTotalSize()); + getMessageResult.getBufferTotalSize()); this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { final long beginTimeMills = this.brokerController.getMessageStore().now(); final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), requestHeader.getQueueId(), - (int) (this.brokerController.getMessageStore().now() - beginTimeMills)); + requestHeader.getTopic(), requestHeader.getQueueId(), + (int)(this.brokerController.getMessageStore().now() - beginTimeMills)); response.setBody(r); } else { try { FileRegion fileRegion = - new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult); + new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -379,18 +370,17 @@ public class PullMessageProcessor implements NettyRequestProcessor { long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, - this.brokerController.getMessageStore().now(), offset, subscriptionData); + this.brokerController.getMessageStore().now(), offset, subscriptionData); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; } - case ResponseCode.PULL_RETRY_IMMEDIATELY: break; case ResponseCode.PULL_OFFSET_MOVED: if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE - || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { + || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { MessageQueue mq = new MessageQueue(); mq.setTopic(requestHeader.getTopic()); mq.setQueueId(requestHeader.getQueueId()); @@ -403,15 +393,15 @@ public class PullMessageProcessor implements NettyRequestProcessor { event.setOffsetNew(getMessageResult.getNextBeginOffset()); this.generateOffsetMovedEvent(event); LOG.warn( - "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), - responseHeader.getSuggestWhichBrokerId()); + "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), + responseHeader.getSuggestWhichBrokerId()); } else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), - responseHeader.getSuggestWhichBrokerId()); + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), + responseHeader.getSuggestWhichBrokerId()); } break; @@ -423,19 +413,17 @@ public class PullMessageProcessor implements NettyRequestProcessor { response.setRemark("store getMessage return null"); } - boolean storeOffsetEnable = brokerAllowSuspend; storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable - && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; + && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) { this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), - requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); } return response; } - public boolean hasConsumeMessageHook() { return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); } @@ -512,7 +500,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed", - future.cause()); + future.cause()); LOG.error(request.toString()); LOG.error(response.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java index 0b6b775..04f206f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java @@ -6,16 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.FileRegion; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.pagecache.OneMessageTransfer; import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer; @@ -31,28 +35,21 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.FileRegion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class QueryMessageProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; - public QueryMessageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; } - @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { switch (request.getCode()) { case RequestCode.QUERY_MESSAGE: return this.queryMessage(ctx, request); @@ -70,44 +67,40 @@ public class QueryMessageProcessor implements NettyRequestProcessor { return false; } - public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = - RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class); + RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class); final QueryMessageResponseHeader responseHeader = - (QueryMessageResponseHeader) response.readCustomHeader(); + (QueryMessageResponseHeader)response.readCustomHeader(); final QueryMessageRequestHeader requestHeader = - (QueryMessageRequestHeader) request - .decodeCommandCustomHeader(QueryMessageRequestHeader.class); - + (QueryMessageRequestHeader)request + .decodeCommandCustomHeader(QueryMessageRequestHeader.class); response.setOpaque(request.getOpaque()); - String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG); if (isUniqueKey != null && isUniqueKey.equals("true")) { requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum()); } final QueryMessageResult queryMessageResult = - this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), - requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), - requestHeader.getEndTimestamp()); + this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), + requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), + requestHeader.getEndTimestamp()); assert queryMessageResult != null; responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset()); responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp()); - if (queryMessageResult.getBufferTotalSize() > 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); try { FileRegion fileRegion = - new QueryMessageTransfer(response.encodeHeader(queryMessageResult - .getBufferTotalSize()), queryMessageResult); + new QueryMessageTransfer(response.encodeHeader(queryMessageResult + .getBufferTotalSize()), queryMessageResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -130,26 +123,24 @@ public class QueryMessageProcessor implements NettyRequestProcessor { return response; } - public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ViewMessageRequestHeader requestHeader = - (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); - + (ViewMessageRequestHeader)request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); response.setOpaque(request.getOpaque()); final SelectMappedBufferResult selectMappedBufferResult = - this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); + this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); if (selectMappedBufferResult != null) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); try { FileRegion fileRegion = - new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()), - selectMappedBufferResult); + new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()), + selectMappedBufferResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 6002df2..1b95205 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -17,11 +17,17 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; +import java.net.SocketAddress; +import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; -import org.apache.rocketmq.common.*; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.message.MessageAccessor; @@ -44,10 +50,6 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; -import java.net.SocketAddress; -import java.util.List; - - public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { private List<ConsumeMessageHook> consumeMessageHookList; @@ -80,14 +82,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement @Override public boolean rejectRequest() { return this.brokerController.getMessageStore().isOSPageCacheBusy() || - this.brokerController.getMessageStore().isTransientStorePoolDeficient(); + this.brokerController.getMessageStore().isTransientStorePoolDeficient(); } private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) - throws RemotingCommandException { + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = - (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); + (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { @@ -101,24 +103,21 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement this.executeConsumeMessageHookAfter(context); } - SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " - + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return response; } - if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); return response; } - if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -128,24 +127,21 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); - int topicSysFlag = 0; if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(// - newTopic, // - subscriptionGroupConfig.getRetryQueueNums(), // - PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); + newTopic, // + subscriptionGroupConfig.getRetryQueueNums(), // + PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return response; } - if (!PermName.isWriteable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); @@ -159,31 +155,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return response; } - final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (null == retryTopic) { MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); } msgExt.setWaitStoreMsgOK(false); - int delayLevel = requestHeader.getDelayLevel(); - int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } - if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// - || delayLevel < 0) { + || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // - PermName.PERM_WRITE, 0 + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 ); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); @@ -247,13 +239,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // - final RemotingCommand request, // - final SendMessageContext sendMessageContext, // - final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); - final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); - + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); response.setOpaque(request.getOpaque()); @@ -296,15 +287,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); SubscriptionGroupConfig subscriptionGroupConfig = - this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark( - "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return response; } - int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); @@ -314,8 +304,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement newTopic = MixAll.getDLQTopic(groupName); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // - PermName.PERM_WRITE, 0 + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 ); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); @@ -344,7 +334,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (traFlag != null) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( - "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); + "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } } @@ -381,12 +371,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement case PROPERTIES_SIZE_EXCEEDED: response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark( - "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); break; case SERVICE_NOT_AVAILABLE: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); response.setRemark( - "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); + "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); break; case OS_PAGECACHE_BUSY: response.setCode(ResponseCode.SYSTEM_ERROR); @@ -407,7 +397,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); + putMessageResult.getAppendMessageResult().getWroteBytes()); this.brokerController.getBrokerStatsManager().incBrokerPutNums(); response.setRemark(null); @@ -416,10 +406,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); - doResponse(ctx, request, response); - if (hasSendMessageHook()) { sendMessageContext.setMsgId(responseHeader.getMsgId()); sendMessageContext.setQueueId(responseHeader.getQueueId()); @@ -427,7 +415,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; + int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); @@ -438,7 +426,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } else { if (hasSendMessageHook()) { int wroteSize = request.getBody().length; - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); sendMessageContext.setCommercialSendTimes(incValue); @@ -479,11 +467,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); String storePathLogis = - StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis); String storePathIndex = - StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex); return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 2db2317..2545f1f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -6,16 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.slave; +import java.io.IOException; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.common.MixAll; @@ -27,30 +28,23 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - - public class SlaveSynchronize { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; private volatile String masterAddr = null; - public SlaveSynchronize(BrokerController brokerController) { this.brokerController = brokerController; } - public String getMasterAddr() { return masterAddr; } - public void setMasterAddr(String masterAddr) { this.masterAddr = masterAddr; } - public void syncAll() { this.syncTopicConfig(); this.syncConsumerOffset(); @@ -58,21 +52,20 @@ public class SlaveSynchronize { this.syncSubscriptionGroupConfig(); } - private void syncTopicConfig() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { TopicConfigSerializeWrapper topicWrapper = - this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); + this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); if (!this.brokerController.getTopicConfigManager().getDataVersion() - .equals(topicWrapper.getDataVersion())) { + .equals(topicWrapper.getDataVersion())) { this.brokerController.getTopicConfigManager().getDataVersion() - .assignNewOne(topicWrapper.getDataVersion()); + .assignNewOne(topicWrapper.getDataVersion()); this.brokerController.getTopicConfigManager().getTopicConfigTable().clear(); this.brokerController.getTopicConfigManager().getTopicConfigTable() - .putAll(topicWrapper.getTopicConfigTable()); + .putAll(topicWrapper.getTopicConfigTable()); this.brokerController.getTopicConfigManager().persist(); log.info("update slave topic config from master, {}", masterAddrBak); @@ -83,15 +76,14 @@ public class SlaveSynchronize { } } - private void syncConsumerOffset() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { ConsumerOffsetSerializeWrapper offsetWrapper = - this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); + this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); this.brokerController.getConsumerOffsetManager().getOffsetTable() - .putAll(offsetWrapper.getOffsetTable()); + .putAll(offsetWrapper.getOffsetTable()); this.brokerController.getConsumerOffsetManager().persist(); log.info("update slave consumer offset from master, {}", masterAddrBak); } catch (Exception e) { @@ -100,18 +92,17 @@ public class SlaveSynchronize { } } - private void syncDelayOffset() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { String delayOffset = - this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); + this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); if (delayOffset != null) { String fileName = - StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController - .getMessageStoreConfig().getStorePathRootDir()); + StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController + .getMessageStoreConfig().getStorePathRootDir()); try { MixAll.string2File(delayOffset, fileName); } catch (IOException e) { @@ -125,24 +116,23 @@ public class SlaveSynchronize { } } - private void syncSubscriptionGroupConfig() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { SubscriptionGroupWrapper subscriptionWrapper = - this.brokerController.getBrokerOuterAPI() - .getAllSubscriptionGroupConfig(masterAddrBak); + this.brokerController.getBrokerOuterAPI() + .getAllSubscriptionGroupConfig(masterAddrBak); if (!this.brokerController.getSubscriptionGroupManager().getDataVersion() - .equals(subscriptionWrapper.getDataVersion())) { + .equals(subscriptionWrapper.getDataVersion())) { SubscriptionGroupManager subscriptionGroupManager = - this.brokerController.getSubscriptionGroupManager(); + this.brokerController.getSubscriptionGroupManager(); subscriptionGroupManager.getDataVersion().assignNewOne( - subscriptionWrapper.getDataVersion()); + subscriptionWrapper.getDataVersion()); subscriptionGroupManager.getSubscriptionGroupTable().clear(); subscriptionGroupManager.getSubscriptionGroupTable().putAll( - subscriptionWrapper.getSubscriptionGroupTable()); + subscriptionWrapper.getSubscriptionGroupTable()); subscriptionGroupManager.persist(); log.info("update slave Subscription Group from master, {}", masterAddrBak); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index 7865bc7..f77249a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -16,6 +16,10 @@ */ package org.apache.rocketmq.broker.subscription; +import java.io.File; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -27,25 +31,23 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - public class SubscriptionGroupManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = - new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); + new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); private final DataVersion dataVersion = new DataVersion(); private transient BrokerController brokerController; - public SubscriptionGroupManager() { this.init(); } + public SubscriptionGroupManager(BrokerController brokerController) { + this.brokerController = brokerController; + this.init(); + } + private void init() { { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); @@ -94,13 +96,6 @@ public class SubscriptionGroupManager extends ConfigManager { } } - - public SubscriptionGroupManager(BrokerController brokerController) { - this.brokerController = brokerController; - this.init(); - } - - public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config); if (old != null) { @@ -122,7 +117,6 @@ public class SubscriptionGroupManager extends ConfigManager { } } - public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group); if (null == subscriptionGroupConfig) { @@ -141,7 +135,6 @@ public class SubscriptionGroupManager extends ConfigManager { return subscriptionGroupConfig; } - @Override public String encode() { return this.encode(false); @@ -181,12 +174,10 @@ public class SubscriptionGroupManager extends ConfigManager { return subscriptionGroupTable; } - public DataVersion getDataVersion() { return dataVersion; } - public void deleteSubscriptionGroupConfig(final String groupName) { SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName); if (old != null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 9e14332..e826d24 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -16,6 +16,16 @@ */ package org.apache.rocketmq.broker.topic; +import java.io.File; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -30,34 +40,20 @@ import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - public class TopicConfigManager extends ConfigManager { private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; private transient final Lock lockTopicConfigTable = new ReentrantLock(); private final ConcurrentHashMap<String, TopicConfig> topicConfigTable = - new ConcurrentHashMap<String, TopicConfig>(1024); + new ConcurrentHashMap<String, TopicConfig>(1024); private final DataVersion dataVersion = new DataVersion(); private final Set<String> systemTopicList = new HashSet<String>(); private transient BrokerController brokerController; - public TopicConfigManager() { } - public TopicConfigManager(BrokerController brokerController) { this.brokerController = brokerController; { @@ -76,9 +72,9 @@ public class TopicConfigManager extends ConfigManager { TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() - .getDefaultTopicQueueNums()); + .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() - .getDefaultTopicQueueNums()); + .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); @@ -130,29 +126,24 @@ public class TopicConfigManager extends ConfigManager { } } - public boolean isSystemTopic(final String topic) { return this.systemTopicList.contains(topic); } - public Set<String> getSystemTopic() { return this.systemTopicList; } - public boolean isTopicCanSendMessage(final String topic) { return !topic.equals(MixAll.DEFAULT_TOPIC); } - public TopicConfig selectTopicConfig(final String topic) { return this.topicConfigTable.get(topic); } - public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, - final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) { + final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) { TopicConfig topicConfig = null; boolean createNew = false; @@ -175,8 +166,8 @@ public class TopicConfigManager extends ConfigManager { topicConfig = new TopicConfig(topic); int queueNums = - clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig - .getWriteQueueNums() : clientDefaultTopicQueueNums; + clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig + .getWriteQueueNums() : clientDefaultTopicQueueNums; if (queueNums < 0) { queueNums = 0; @@ -191,17 +182,17 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); } else { LOG.warn("create new topic failed, because the default topic[" + defaultTopic - + "] no perm, " + defaultTopicConfig.getPerm() + " producer: " - + remoteAddress); + + "] no perm, " + defaultTopicConfig.getPerm() + " producer: " + + remoteAddress); } } else { LOG.warn("create new topic failed, because the default topic[" + defaultTopic - + "] not exist." + " producer: " + remoteAddress); + + "] not exist." + " producer: " + remoteAddress); } if (topicConfig != null) { LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig - + " producer: " + remoteAddress); + + " producer: " + remoteAddress); this.topicConfigTable.put(topic, topicConfig); @@ -227,10 +218,10 @@ public class TopicConfigManager extends ConfigManager { } public TopicConfig createTopicInSendMessageBackMethod( - final String topic, - final int clientDefaultTopicQueueNums, - final int perm, - final int topicSysFlag) { + final String topic, + final int clientDefaultTopicQueueNums, + final int perm, + final int topicSysFlag) { TopicConfig topicConfig = this.topicConfigTable.get(topic); if (topicConfig != null) return topicConfig; @@ -282,7 +273,7 @@ public class TopicConfigManager extends ConfigManager { } LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, - topicConfig.getTopicSysFlag()); + topicConfig.getTopicSysFlag()); this.topicConfigTable.put(topic, topicConfig); @@ -302,7 +293,7 @@ public class TopicConfigManager extends ConfigManager { } LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, - topicConfig.getTopicSysFlag()); + topicConfig.getTopicSysFlag()); this.topicConfigTable.put(topic, topicConfig); @@ -326,7 +317,6 @@ public class TopicConfigManager extends ConfigManager { this.persist(); } - public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) { @@ -403,7 +393,7 @@ public class TopicConfigManager extends ConfigManager { public void decode(String jsonString) { if (jsonString != null) { TopicConfigSerializeWrapper topicConfigSerializeWrapper = - TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class); + TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class); if (topicConfigSerializeWrapper != null) { this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable()); this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java index 68256d9..830a0c5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.transaction; @@ -22,22 +22,18 @@ public class TransactionRecord { private long offset; private String producerGroup; - public long getOffset() { return offset; } - public void setOffset(long offset) { this.offset = offset; } - public String getProducerGroup() { return producerGroup; } - public void setProducerGroup(String producerGroup) { this.producerGroup = producerGroup; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java index d6e897a..f9b56d5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java @@ -6,41 +6,33 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.transaction; import java.util.List; - public interface TransactionStore { boolean open(); - void close(); - boolean put(final List<TransactionRecord> trs); - void remove(final List<Long> pks); - List<TransactionRecord> traverse(final long pk, final int nums); - long totalRecords(); - long minPK(); - long maxPK(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java index 4bf73d2..240e141 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java @@ -6,17 +6,27 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.transaction.jdbc; +import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.broker.transaction.TransactionRecord; import org.apache.rocketmq.broker.transaction.TransactionStore; import org.apache.rocketmq.common.MixAll; @@ -24,13 +34,6 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URL; -import java.sql.*; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicLong; - - public class JDBCTransactionStore implements TransactionStore { private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig; @@ -50,11 +53,10 @@ public class JDBCTransactionStore implements TransactionStore { try { this.connection = - DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props); + DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props); this.connection.setAutoCommit(false); - if (!this.computeTotalRecords()) { return this.createDB(); } @@ -72,7 +74,7 @@ public class JDBCTransactionStore implements TransactionStore { try { Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance(); log.info("Loaded the appropriate driver, {}", - this.jdbcTransactionStoreConfig.getJdbcDriverClass()); + this.jdbcTransactionStoreConfig.getJdbcDriverClass()); return true; } catch (Exception e) { log.info("Loaded the appropriate driver Exception", e); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java index 5789329..86c1ec8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker.transaction.jdbc; @@ -23,42 +23,34 @@ public class JDBCTransactionStoreConfig { private String jdbcUser = "xxx"; private String jdbcPassword = "xxx"; - public String getJdbcDriverClass() { return jdbcDriverClass; } - public void setJdbcDriverClass(String jdbcDriverClass) { this.jdbcDriverClass = jdbcDriverClass; } - public String getJdbcURL() { return jdbcURL; } - public void setJdbcURL(String jdbcURL) { this.jdbcURL = jdbcURL; } - public String getJdbcUser() { return jdbcUser; } - public void setJdbcUser(String jdbcUser) { this.jdbcUser = jdbcUser; } - public String getJdbcPassword() { return jdbcPassword; } - public void setJdbcPassword(String jdbcPassword) { this.jdbcPassword = jdbcPassword; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index 6e7b9b0..f7675c2 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.broker; @@ -27,9 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BrokerControllerTest { - protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class); - private static final int RESTART_NUM = 3; + protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class); /** * Tests if the controller can be properly stopped and started. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java index 4fd7a5b..5e944d8 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagew...@apache.org $ */ /** @@ -20,6 +22,8 @@ */ package org.apache.rocketmq.broker; +import java.io.File; +import java.util.Random; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; @@ -30,15 +34,11 @@ import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.Random; - public class BrokerTestHarness { + public final String BROKER_NAME = "TestBrokerName"; protected BrokerController brokerController = null; - protected Random random = new Random(); - public final String BROKER_NAME = "TestBrokerName"; protected String brokerAddr = ""; protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class); protected BrokerConfig brokerConfig = new BrokerConfig();