http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java new file mode 100644 index 0000000..7a0ddae --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -0,0 +1,497 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.processor; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; +import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; +import org.apache.rocketmq.broker.mqtrace.SendMessageContext; +import org.apache.rocketmq.common.*; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.apache.rocketmq.store.stats.BrokerStatsManager; + +import java.net.SocketAddress; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { + + private List<ConsumeMessageHook> consumeMessageHookList; + + public SendMessageProcessor(final BrokerController brokerController) { + super(brokerController); + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + SendMessageContext mqtraceContext; + switch (request.getCode()) { + case RequestCode.CONSUMER_SEND_MSG_BACK: + return this.consumerSendMsgBack(ctx, request); + default: + SendMessageRequestHeader requestHeader = parseRequestHeader(request); + if (requestHeader == null) { + return null; + } + + mqtraceContext = buildMsgContext(ctx, requestHeader); + this.executeSendMessageHookBefore(ctx, request, mqtraceContext); + final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + + this.executeSendMessageHookAfter(response, mqtraceContext); + return response; + } + } + + @Override + public boolean rejectRequest() { + return this.brokerController.getMessageStore().isOSPageCacheBusy() || + this.brokerController.getMessageStore().isTransientStorePoolDeficient(); + } + + private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) + throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final ConsumerSendMsgBackRequestHeader requestHeader = + (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); + + if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { + + ConsumeMessageContext context = new ConsumeMessageContext(); + context.setConsumerGroup(requestHeader.getGroup()); + context.setTopic(requestHeader.getOriginTopic()); + context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); + context.setCommercialRcvTimes(1); + context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER)); + + this.executeConsumeMessageHookAfter(context); + } + + + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + + + if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); + return response; + } + + + if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); + int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); + + + int topicSysFlag = 0; + if (requestHeader.isUnitMode()) { + topicSysFlag = TopicSysFlag.buildSysFlag(false, true); + } + + + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(// + newTopic, // + subscriptionGroupConfig.getRetryQueueNums(), // + PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + } + + + if (!PermName.isWriteable(topicConfig.getPerm())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); + return response; + } + + MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); + if (null == msgExt) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("look message by offset failed, " + requestHeader.getOffset()); + return response; + } + + + final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (null == retryTopic) { + MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); + } + msgExt.setWaitStoreMsgOK(false); + + + int delayLevel = requestHeader.getDelayLevel(); + + + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + } + + + if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// + || delayLevel < 0) { + newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 + ); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + } + } else { + if (0 == delayLevel) { + delayLevel = 3 + msgExt.getReconsumeTimes(); + } + + msgExt.setDelayTimeLevel(delayLevel); + } + + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(newTopic); + msgInner.setBody(msgExt.getBody()); + msgInner.setFlag(msgExt.getFlag()); + MessageAccessor.setProperties(msgInner, msgExt.getProperties()); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); + + msgInner.setQueueId(queueIdInt); + msgInner.setSysFlag(msgExt.getSysFlag()); + msgInner.setBornTimestamp(msgExt.getBornTimestamp()); + msgInner.setBornHost(msgExt.getBornHost()); + msgInner.setStoreHost(this.getStoreHost()); + msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); + + String originMsgId = MessageAccessor.getOriginMessageId(msgExt); + MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); + + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + if (putMessageResult != null) { + switch (putMessageResult.getPutMessageStatus()) { + case PUT_OK: + String backTopic = msgExt.getTopic(); + String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (correctTopic != null) { + backTopic = correctTopic; + } + + this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + return response; + default: + break; + } + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(putMessageResult.getPutMessageStatus().name()); + return response; + } + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("putMessageResult is null"); + return response; + } + + private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // + final RemotingCommand request, // + final SendMessageContext sendMessageContext, // + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + if (log.isDebugEnabled()) { + log.debug("receive SendMessage request command, " + request); + } + + final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimstamp) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + final byte[] body = request.getBody(); + + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + + int sysFlag = requestHeader.getSysFlag(); + + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + + String newTopic = requestHeader.getTopic(); + if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); + if (null == subscriptionGroupConfig) { + response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + response.setRemark( + "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + return response; + } + + + int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); + if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { + maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); + } + int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); + if (reconsumeTimes >= maxReconsumeTimes) { + newTopic = MixAll.getDLQTopic(groupName); + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // + DLQ_NUMS_PER_GROUP, // + PermName.PERM_WRITE, 0 + ); + if (null == topicConfig) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("topic[" + newTopic + "] not exist"); + return response; + } + } + } + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(newTopic); + msgInner.setBody(body); + msgInner.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); + msgInner.setPropertiesString(requestHeader.getProperties()); + msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags())); + + msgInner.setQueueId(queueIdInt); + msgInner.setSysFlag(sysFlag); + msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); + msgInner.setBornHost(ctx.channel().remoteAddress()); + msgInner.setStoreHost(this.getStoreHost()); + msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + + if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { + String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); + if (traFlag != null) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark( + "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); + return response; + } + } + + PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + if (putMessageResult != null) { + boolean sendOK = false; + + switch (putMessageResult.getPutMessageStatus()) { + // Success + case PUT_OK: + sendOK = true; + response.setCode(ResponseCode.SUCCESS); + break; + case FLUSH_DISK_TIMEOUT: + response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); + sendOK = true; + break; + case FLUSH_SLAVE_TIMEOUT: + response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); + sendOK = true; + break; + case SLAVE_NOT_AVAILABLE: + response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); + sendOK = true; + break; + + // Failed + case CREATE_MAPEDFILE_FAILED: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("create mapped file failed, server is busy or broken."); + break; + case MESSAGE_ILLEGAL: + case PROPERTIES_SIZE_EXCEEDED: + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark( + "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); + break; + case SERVICE_NOT_AVAILABLE: + response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); + response.setRemark( + "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); + break; + case OS_PAGECACHE_BUSY: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); + break; + case UNKNOWN_ERROR: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR"); + break; + default: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("UNKNOWN_ERROR DEFAULT"); + break; + } + + String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); + if (sendOK) { + + this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); + this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + this.brokerController.getBrokerStatsManager().incBrokerPutNums(); + + response.setRemark(null); + + responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); + responseHeader.setQueueId(queueIdInt); + responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); + + + doResponse(ctx, request, response); + + + if (hasSendMessageHook()) { + sendMessageContext.setMsgId(responseHeader.getMsgId()); + sendMessageContext.setQueueId(responseHeader.getQueueId()); + sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); + + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); + int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); + } + return null; + } else { + if (hasSendMessageHook()) { + int wroteSize = request.getBody().length; + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + + sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); + sendMessageContext.setCommercialSendTimes(incValue); + sendMessageContext.setCommercialSendSize(wroteSize); + sendMessageContext.setCommercialOwner(owner); + } + } + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("store putMessage return null"); + } + + return response; + } + + public boolean hasConsumeMessageHook() { + return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); + } + + public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) { + if (hasConsumeMessageHook()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageAfter(context); + } catch (Throwable e) { + } + } + } + } + + public SocketAddress getStoreHost() { + return storeHost; + } + + private String diskUtil() { + String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + + String storePathLogis = + StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis); + + String storePathIndex = + StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex); + + return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio); + } + + public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) { + this.consumeMessageHookList = consumeMessageHookList; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java new file mode 100644 index 0000000..45914d7 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.slave; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + +/** + * @author shijia.wxr + * @author manhong.yqd + */ +public class SlaveSynchronize { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + private volatile String masterAddr = null; + + + public SlaveSynchronize(BrokerController brokerController) { + this.brokerController = brokerController; + } + + + public String getMasterAddr() { + return masterAddr; + } + + + public void setMasterAddr(String masterAddr) { + this.masterAddr = masterAddr; + } + + + public void syncAll() { + this.syncTopicConfig(); + this.syncConsumerOffset(); + this.syncDelayOffset(); + this.syncSubscriptionGroupConfig(); + } + + + private void syncTopicConfig() { + String masterAddrBak = this.masterAddr; + if (masterAddrBak != null) { + try { + TopicConfigSerializeWrapper topicWrapper = + this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); + if (!this.brokerController.getTopicConfigManager().getDataVersion() + .equals(topicWrapper.getDataVersion())) { + + this.brokerController.getTopicConfigManager().getDataVersion() + .assignNewOne(topicWrapper.getDataVersion()); + this.brokerController.getTopicConfigManager().getTopicConfigTable().clear(); + this.brokerController.getTopicConfigManager().getTopicConfigTable() + .putAll(topicWrapper.getTopicConfigTable()); + this.brokerController.getTopicConfigManager().persist(); + + log.info("update slave topic config from master, {}", masterAddrBak); + } + } catch (Exception e) { + log.error("syncTopicConfig Exception, " + masterAddrBak, e); + } + } + } + + + private void syncConsumerOffset() { + String masterAddrBak = this.masterAddr; + if (masterAddrBak != null) { + try { + ConsumerOffsetSerializeWrapper offsetWrapper = + this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); + this.brokerController.getConsumerOffsetManager().getOffsetTable() + .putAll(offsetWrapper.getOffsetTable()); + this.brokerController.getConsumerOffsetManager().persist(); + log.info("update slave consumer offset from master, {}", masterAddrBak); + } catch (Exception e) { + log.error("syncConsumerOffset Exception, " + masterAddrBak, e); + } + } + } + + + private void syncDelayOffset() { + String masterAddrBak = this.masterAddr; + if (masterAddrBak != null) { + try { + String delayOffset = + this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); + if (delayOffset != null) { + + String fileName = + StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController + .getMessageStoreConfig().getStorePathRootDir()); + try { + MixAll.string2File(delayOffset, fileName); + } catch (IOException e) { + log.error("persist file Exception, " + fileName, e); + } + } + log.info("update slave delay offset from master, {}", masterAddrBak); + } catch (Exception e) { + log.error("syncDelayOffset Exception, " + masterAddrBak, e); + } + } + } + + + private void syncSubscriptionGroupConfig() { + String masterAddrBak = this.masterAddr; + if (masterAddrBak != null) { + try { + SubscriptionGroupWrapper subscriptionWrapper = + this.brokerController.getBrokerOuterAPI() + .getAllSubscriptionGroupConfig(masterAddrBak); + + if (!this.brokerController.getSubscriptionGroupManager().getDataVersion() + .equals(subscriptionWrapper.getDataVersion())) { + SubscriptionGroupManager subscriptionGroupManager = + this.brokerController.getSubscriptionGroupManager(); + subscriptionGroupManager.getDataVersion().assignNewOne( + subscriptionWrapper.getDataVersion()); + subscriptionGroupManager.getSubscriptionGroupTable().clear(); + subscriptionGroupManager.getSubscriptionGroupTable().putAll( + subscriptionWrapper.getSubscriptionGroupTable()); + subscriptionGroupManager.persist(); + log.info("update slave Subscription Group from master, {}", masterAddrBak); + } + } catch (Exception e) { + log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java new file mode 100644 index 0000000..364d5c8 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.subscription; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerPathConfigHelper; +import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class SubscriptionGroupManager extends ConfigManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = + new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); + private final DataVersion dataVersion = new DataVersion(); + private transient BrokerController brokerController; + + + public SubscriptionGroupManager() { + this.init(); + } + + private void init() { + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP); + this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP); + this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP); + this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig); + } + } + + + public SubscriptionGroupManager(BrokerController brokerController) { + this.brokerController = brokerController; + this.init(); + } + + + public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { + SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config); + if (old != null) { + log.info("update subscription group config, old: " + old + " new: " + config); + } else { + log.info("create new subscription group, " + config); + } + + this.dataVersion.nextVersion(); + + this.persist(); + } + + public void disableConsume(final String groupName) { + SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName); + if (old != null) { + old.setConsumeEnable(false); + this.dataVersion.nextVersion(); + } + } + + + public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { + SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group); + if (null == subscriptionGroupConfig) { + if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) { + subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(group); + SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig); + if (null == preConfig) { + log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString()); + } + this.dataVersion.nextVersion(); + this.persist(); + } + } + + return subscriptionGroupConfig; + } + + + @Override + public String encode() { + return this.encode(false); + } + + @Override + public String configFilePath() { + //return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + return BrokerPathConfigHelper.getSubscriptionGroupPath(System.getProperty("user.home") + File.separator + "store"); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class); + if (obj != null) { + this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable); + this.dataVersion.assignNewOne(obj.dataVersion); + this.printLoadDataWhenFirstBoot(obj); + } + } + } + + public String encode(final boolean prettyFormat) { + return RemotingSerializable.toJson(this, prettyFormat); + } + + private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) { + Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<String, SubscriptionGroupConfig> next = it.next(); + log.info("load exist subscription group, {}", next.getValue().toString()); + } + } + + public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { + return subscriptionGroupTable; + } + + + public DataVersion getDataVersion() { + return dataVersion; + } + + + public void deleteSubscriptionGroupConfig(final String groupName) { + SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName); + if (old != null) { + log.info("delete subscription group OK, subscription group: " + old); + this.dataVersion.nextVersion(); + this.persist(); + } else { + log.warn("delete subscription group failed, subscription group: " + old + " not exist"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java new file mode 100644 index 0000000..40fdd68 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerPathConfigHelper; +import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * @author shijia.wxr + */ +public class TopicConfigManager extends ConfigManager { + private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final long LOCK_TIMEOUT_MILLIS = 3000; + private transient final Lock lockTopicConfigTable = new ReentrantLock(); + + private final ConcurrentHashMap<String, TopicConfig> topicConfigTable = + new ConcurrentHashMap<String, TopicConfig>(1024); + private final DataVersion dataVersion = new DataVersion(); + private final Set<String> systemTopicList = new HashSet<String>(); + private transient BrokerController brokerController; + + + public TopicConfigManager() { + } + + + public TopicConfigManager(BrokerController brokerController) { + this.brokerController = brokerController; + { + // MixAll.SELF_TEST_TOPIC + String topic = MixAll.SELF_TEST_TOPIC; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(1); + topicConfig.setWriteQueueNums(1); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + { + // MixAll.DEFAULT_TOPIC + if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { + String topic = MixAll.DEFAULT_TOPIC; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() + .getDefaultTopicQueueNums()); + topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() + .getDefaultTopicQueueNums()); + int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; + topicConfig.setPerm(perm); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + } + { + // MixAll.BENCHMARK_TOPIC + String topic = MixAll.BENCHMARK_TOPIC; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(1024); + topicConfig.setWriteQueueNums(1024); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + { + + String topic = this.brokerController.getBrokerConfig().getBrokerClusterName(); + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + int perm = PermName.PERM_INHERIT; + if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) { + perm |= PermName.PERM_READ | PermName.PERM_WRITE; + } + topicConfig.setPerm(perm); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + { + + String topic = this.brokerController.getBrokerConfig().getBrokerName(); + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + int perm = PermName.PERM_INHERIT; + if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) { + perm |= PermName.PERM_READ | PermName.PERM_WRITE; + } + topicConfig.setReadQueueNums(1); + topicConfig.setWriteQueueNums(1); + topicConfig.setPerm(perm); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + { + // MixAll.OFFSET_MOVED_EVENT + String topic = MixAll.OFFSET_MOVED_EVENT; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(1); + topicConfig.setWriteQueueNums(1); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + } + + + public boolean isSystemTopic(final String topic) { + return this.systemTopicList.contains(topic); + } + + + public Set<String> getSystemTopic() { + return this.systemTopicList; + } + + + public boolean isTopicCanSendMessage(final String topic) { + return !topic.equals(MixAll.DEFAULT_TOPIC); + } + + + public TopicConfig selectTopicConfig(final String topic) { + return this.topicConfigTable.get(topic); + } + + + public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, + final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) { + TopicConfig topicConfig = null; + boolean createNew = false; + + try { + if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) + return topicConfig; + + TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic); + if (defaultTopicConfig != null) { + if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) { + if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { + defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + } + } + + if (PermName.isInherited(defaultTopicConfig.getPerm())) { + topicConfig = new TopicConfig(topic); + + int queueNums = + clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig + .getWriteQueueNums() : clientDefaultTopicQueueNums; + + if (queueNums < 0) { + queueNums = 0; + } + + topicConfig.setReadQueueNums(queueNums); + topicConfig.setWriteQueueNums(queueNums); + int perm = defaultTopicConfig.getPerm(); + perm &= ~PermName.PERM_INHERIT; + topicConfig.setPerm(perm); + topicConfig.setTopicSysFlag(topicSysFlag); + topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); + } else { + LOG.warn("create new topic failed, because the default topic[" + defaultTopic + + "] no perm, " + defaultTopicConfig.getPerm() + " producer: " + + remoteAddress); + } + } else { + LOG.warn("create new topic failed, because the default topic[" + defaultTopic + + "] not exist." + " producer: " + remoteAddress); + } + + if (topicConfig != null) { + LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig + + " producer: " + remoteAddress); + + this.topicConfigTable.put(topic, topicConfig); + + this.dataVersion.nextVersion(); + + createNew = true; + + this.persist(); + } + } finally { + this.lockTopicConfigTable.unlock(); + } + } + } catch (InterruptedException e) { + LOG.error("createTopicInSendMessageMethod exception", e); + } + + if (createNew) { + this.brokerController.registerBrokerAll(false, true); + } + + return topicConfig; + } + + public TopicConfig createTopicInSendMessageBackMethod( + final String topic, + final int clientDefaultTopicQueueNums, + final int perm, + final int topicSysFlag) { + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) + return topicConfig; + + boolean createNew = false; + + try { + if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) + return topicConfig; + + topicConfig = new TopicConfig(topic); + topicConfig.setReadQueueNums(clientDefaultTopicQueueNums); + topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums); + topicConfig.setPerm(perm); + topicConfig.setTopicSysFlag(topicSysFlag); + + LOG.info("create new topic {}", topicConfig); + this.topicConfigTable.put(topic, topicConfig); + createNew = true; + this.dataVersion.nextVersion(); + this.persist(); + } finally { + this.lockTopicConfigTable.unlock(); + } + } + } catch (InterruptedException e) { + LOG.error("createTopicInSendMessageBackMethod exception", e); + } + + if (createNew) { + this.brokerController.registerBrokerAll(false, true); + } + + return topicConfig; + } + + public void updateTopicUnitFlag(final String topic, final boolean unit) { + + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) { + int oldTopicSysFlag = topicConfig.getTopicSysFlag(); + if (unit) { + topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag)); + } else { + topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag)); + } + + LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, + topicConfig.getTopicSysFlag()); + + this.topicConfigTable.put(topic, topicConfig); + + this.dataVersion.nextVersion(); + + this.persist(); + this.brokerController.registerBrokerAll(false, true); + } + } + + public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) { + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) { + int oldTopicSysFlag = topicConfig.getTopicSysFlag(); + if (hasUnitSub) { + topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag)); + } + + LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, + topicConfig.getTopicSysFlag()); + + this.topicConfigTable.put(topic, topicConfig); + + this.dataVersion.nextVersion(); + + this.persist(); + this.brokerController.registerBrokerAll(false, true); + } + } + + public void updateTopicConfig(final TopicConfig topicConfig) { + TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + if (old != null) { + LOG.info("update topic config, old: " + old + " new: " + topicConfig); + } else { + LOG.info("create new topic, " + topicConfig); + } + + this.dataVersion.nextVersion(); + + this.persist(); + } + + + public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { + + if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) { + boolean isChange = false; + Set<String> orderTopics = orderKVTableFromNs.getTable().keySet(); + for (String topic : orderTopics) { + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null && !topicConfig.isOrder()) { + topicConfig.setOrder(true); + isChange = true; + LOG.info("update order topic config, topic={}, order={}", topic, true); + } + } + + for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) { + String topic = entry.getKey(); + if (!orderTopics.contains(topic)) { + TopicConfig topicConfig = entry.getValue(); + if (topicConfig.isOrder()) { + topicConfig.setOrder(false); + isChange = true; + LOG.info("update order topic config, topic={}, order={}", topic, false); + } + } + } + + if (isChange) { + this.dataVersion.nextVersion(); + this.persist(); + } + } + } + + public boolean isOrderTopic(final String topic) { + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig == null) { + return false; + } else { + return topicConfig.isOrder(); + } + } + + public void deleteTopicConfig(final String topic) { + TopicConfig old = this.topicConfigTable.remove(topic); + if (old != null) { + LOG.info("delete topic config OK, topic: " + old); + this.dataVersion.nextVersion(); + this.persist(); + } else { + LOG.warn("delete topic config failed, topic: " + topic + " not exist"); + } + } + + public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable); + topicConfigSerializeWrapper.setDataVersion(this.dataVersion); + return topicConfigSerializeWrapper; + } + + @Override + public String encode() { + return encode(false); + } + + @Override + public String configFilePath() { +// return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig() +// .getStorePathRootDir()); + return BrokerPathConfigHelper.getTopicConfigPath(System.getProperty("user.home") + File.separator + "store"); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = + TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class); + if (topicConfigSerializeWrapper != null) { + this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable()); + this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion()); + this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper); + } + } + } + + public String encode(final boolean prettyFormat) { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable); + topicConfigSerializeWrapper.setDataVersion(this.dataVersion); + return topicConfigSerializeWrapper.toJson(prettyFormat); + } + + private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) { + Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<String, TopicConfig> next = it.next(); + LOG.info("load exist local topic, {}", next.getValue().toString()); + } + } + + public DataVersion getDataVersion() { + return dataVersion; + } + + public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() { + return topicConfigTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java new file mode 100644 index 0000000..68256d9 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.transaction; + +public class TransactionRecord { + // Commit Log Offset + private long offset; + private String producerGroup; + + + public long getOffset() { + return offset; + } + + + public void setOffset(long offset) { + this.offset = offset; + } + + + public String getProducerGroup() { + return producerGroup; + } + + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java new file mode 100644 index 0000000..758eeed --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.transaction; + +import java.util.List; + + +public interface TransactionStore { + public boolean open(); + + + public void close(); + + + public boolean put(final List<TransactionRecord> trs); + + + public void remove(final List<Long> pks); + + + public List<TransactionRecord> traverse(final long pk, final int nums); + + + public long totalRecords(); + + + public long minPK(); + + + public long maxPK(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java new file mode 100644 index 0000000..4bf73d2 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.transaction.jdbc; + +import org.apache.rocketmq.broker.transaction.TransactionRecord; +import org.apache.rocketmq.broker.transaction.TransactionStore; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.sql.*; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + + +public class JDBCTransactionStore implements TransactionStore { + private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); + private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig; + private Connection connection; + private AtomicLong totalRecordsValue = new AtomicLong(0); + + public JDBCTransactionStore(JDBCTransactionStoreConfig jdbcTransactionStoreConfig) { + this.jdbcTransactionStoreConfig = jdbcTransactionStoreConfig; + } + + @Override + public boolean open() { + if (this.loadDriver()) { + Properties props = new Properties(); + props.put("user", jdbcTransactionStoreConfig.getJdbcUser()); + props.put("password", jdbcTransactionStoreConfig.getJdbcPassword()); + + try { + this.connection = + DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props); + + this.connection.setAutoCommit(false); + + + if (!this.computeTotalRecords()) { + return this.createDB(); + } + + return true; + } catch (SQLException e) { + log.info("Create JDBC Connection Exeption", e); + } + } + + return false; + } + + private boolean loadDriver() { + try { + Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance(); + log.info("Loaded the appropriate driver, {}", + this.jdbcTransactionStoreConfig.getJdbcDriverClass()); + return true; + } catch (Exception e) { + log.info("Loaded the appropriate driver Exception", e); + } + + return false; + } + + private boolean computeTotalRecords() { + Statement statement = null; + ResultSet resultSet = null; + try { + statement = this.connection.createStatement(); + + resultSet = statement.executeQuery("select count(offset) as total from t_transaction"); + if (!resultSet.next()) { + log.warn("computeTotalRecords ResultSet is empty"); + return false; + } + + this.totalRecordsValue.set(resultSet.getLong(1)); + } catch (Exception e) { + log.warn("computeTotalRecords Exception", e); + return false; + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + } + } + + if (null != resultSet) { + try { + resultSet.close(); + } catch (SQLException e) { + } + } + } + + return true; + } + + private boolean createDB() { + Statement statement = null; + try { + statement = this.connection.createStatement(); + + String sql = this.createTableSql(); + log.info("createDB SQL:\n {}", sql); + statement.execute(sql); + this.connection.commit(); + return true; + } catch (Exception e) { + log.warn("createDB Exception", e); + return false; + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + log.warn("Close statement exception", e); + } + } + } + } + + private String createTableSql() { + URL resource = JDBCTransactionStore.class.getClassLoader().getResource("transaction.sql"); + String fileContent = MixAll.file2String(resource); + return fileContent; + } + + @Override + public void close() { + try { + if (this.connection != null) { + this.connection.close(); + } + } catch (SQLException e) { + } + } + + @Override + public boolean put(List<TransactionRecord> trs) { + PreparedStatement statement = null; + try { + this.connection.setAutoCommit(false); + statement = this.connection.prepareStatement("insert into t_transaction values (?, ?)"); + for (TransactionRecord tr : trs) { + statement.setLong(1, tr.getOffset()); + statement.setString(2, tr.getProducerGroup()); + statement.addBatch(); + } + int[] executeBatch = statement.executeBatch(); + this.connection.commit(); + this.totalRecordsValue.addAndGet(updatedRows(executeBatch)); + return true; + } catch (Exception e) { + log.warn("createDB Exception", e); + return false; + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + log.warn("Close statement exception", e); + } + } + } + } + + private long updatedRows(int[] rows) { + long res = 0; + for (int i : rows) { + res += i; + } + + return res; + } + + @Override + public void remove(List<Long> pks) { + PreparedStatement statement = null; + try { + this.connection.setAutoCommit(false); + statement = this.connection.prepareStatement("DELETE FROM t_transaction WHERE offset = ?"); + for (long pk : pks) { + statement.setLong(1, pk); + statement.addBatch(); + } + int[] executeBatch = statement.executeBatch(); + this.connection.commit(); + } catch (Exception e) { + log.warn("createDB Exception", e); + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + } + } + } + } + + @Override + public List<TransactionRecord> traverse(long pk, int nums) { + return null; + } + + @Override + public long totalRecords() { + return this.totalRecordsValue.get(); + } + + @Override + public long minPK() { + return 0; + } + + @Override + public long maxPK() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java new file mode 100644 index 0000000..5789329 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.transaction.jdbc; + +public class JDBCTransactionStoreConfig { + private String jdbcDriverClass = "com.mysql.jdbc.Driver"; + private String jdbcURL = "jdbc:mysql://xxx.xxx.xxx.xxx:1000/xxx?useUnicode=true&characterEncoding=UTF-8"; + private String jdbcUser = "xxx"; + private String jdbcPassword = "xxx"; + + + public String getJdbcDriverClass() { + return jdbcDriverClass; + } + + + public void setJdbcDriverClass(String jdbcDriverClass) { + this.jdbcDriverClass = jdbcDriverClass; + } + + + public String getJdbcURL() { + return jdbcURL; + } + + + public void setJdbcURL(String jdbcURL) { + this.jdbcURL = jdbcURL; + } + + + public String getJdbcUser() { + return jdbcUser; + } + + + public void setJdbcUser(String jdbcUser) { + this.jdbcUser = jdbcUser; + } + + + public String getJdbcPassword() { + return jdbcPassword; + } + + + public void setJdbcPassword(String jdbcPassword) { + this.jdbcPassword = jdbcPassword; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java deleted file mode 100644 index b661385..0000000 --- a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker; - -import com.alibaba.rocketmq.common.BrokerConfig; -import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author shtykh_roman - */ -public class BrokerControllerTest { - protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class); - - private static final int RESTART_NUM = 3; - - /** - * Tests if the controller can be properly stopped and started. - * - * @throws Exception If fails. - */ - @Test - public void testRestart() throws Exception { - - for (int i = 0; i < RESTART_NUM; i++) { - BrokerController brokerController = new BrokerController(// - new BrokerConfig(), // - new NettyServerConfig(), // - new NettyClientConfig(), // - new MessageStoreConfig()); - boolean initResult = brokerController.initialize(); - Assert.assertTrue(initResult); - logger.info("Broker is initialized " + initResult); - brokerController.start(); - logger.info("Broker is started"); - - brokerController.shutdown(); - logger.info("Broker is stopped"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java b/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java deleted file mode 100644 index ca6f17b..0000000 --- a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ - */ -package com.alibaba.rocketmq.broker; - -import com.alibaba.rocketmq.common.BrokerConfig; -import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Random; - -/** - * @author zander - */ -public class BrokerTestHarness { - - protected BrokerController brokerController = null; - - protected Random random = new Random(); - public final String BROKER_NAME = "TestBrokerName"; - protected String brokerAddr = ""; - protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class); - protected BrokerConfig brokerConfig = new BrokerConfig(); - protected NettyServerConfig nettyServerConfig = new NettyServerConfig(); - protected NettyClientConfig nettyClientConfig = new NettyClientConfig(); - protected MessageStoreConfig storeConfig = new MessageStoreConfig(); - - @Before - public void startup() throws Exception { - brokerConfig.setBrokerName(BROKER_NAME); - brokerConfig.setBrokerIP1("127.0.0.1"); - storeConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore"); - storeConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog"); - nettyServerConfig.setListenPort(10000 + random.nextInt(1000)); - brokerAddr = brokerConfig.getBrokerIP1() + ":" + nettyServerConfig.getListenPort(); - brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); - boolean initResult = brokerController.initialize(); - Assert.assertTrue(initResult); - logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); - brokerController.start(); - } - - @After - public void shutdown() throws Exception { - if (brokerController != null) { - brokerController.shutdown(); - } - //maybe need to clean the file store. But we do not suggest deleting anything. - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java deleted file mode 100644 index cf97876..0000000 --- a/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ - */ -package com.alibaba.rocketmq.broker.api; - -import com.alibaba.rocketmq.broker.BrokerTestHarness; -import com.alibaba.rocketmq.client.ClientConfig; -import com.alibaba.rocketmq.client.hook.SendMessageContext; -import com.alibaba.rocketmq.client.impl.CommunicationMode; -import com.alibaba.rocketmq.client.impl.MQClientAPIImpl; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.client.producer.SendStatus; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; -import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.*; - - -/** - * @author zander - */ -public class SendMessageTest extends BrokerTestHarness{ - - MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig()); - String topic = "UnitTestTopic"; - - @Before - @Override - public void startup() throws Exception { - super.startup(); - client.start(); - - } - - @After - @Override - public void shutdown() throws Exception { - client.shutdown(); - super.shutdown(); - } - - @Test - public void testSendSingle() throws Exception{ - Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes()); - SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); - requestHeader.setProducerGroup("abc"); - requestHeader.setTopic(msg.getTopic()); - requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC); - requestHeader.setDefaultTopicQueueNums(4); - requestHeader.setQueueId(0); - requestHeader.setSysFlag(0); - requestHeader.setBornTimestamp(System.currentTimeMillis()); - requestHeader.setFlag(msg.getFlag()); - requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); - - SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5, - CommunicationMode.SYNC, new SendMessageContext(), null); - assertEquals(result.getSendStatus(), SendStatus.SEND_OK); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java deleted file mode 100644 index 94504a4..0000000 --- a/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ - */ -package com.alibaba.rocketmq.broker.offset; - -import com.alibaba.rocketmq.broker.BrokerTestHarness; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - - -/** - * @author zander - */ -public class ConsumerOffsetManagerTest extends BrokerTestHarness{ - - @Test - public void testFlushConsumerOffset() throws Exception { - ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController); - for (int i = 0; i < 10; i++) { - String group = "UNIT_TEST_GROUP_" + i; - for (int id = 0; id < 10; id++) { - consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, id + 100); - consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, id + 100); - consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, id + 100); - } - } - consumerOffsetManager.persist(); - consumerOffsetManager.getOffsetTable().clear(); - for (int i = 0; i < 10; i++) { - String group = "UNIT_TEST_GROUP_" + i; - for (int id = 0; id < 10; id++) { - assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), -1); - assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1); - assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1); - } - } - consumerOffsetManager.load(); - for (int i = 0; i < 10; i++) { - String group = "UNIT_TEST_GROUP_" + i; - for (int id = 0; id < 10; id++) { - assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), id + 100); - assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100); - assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100); - } - } - } -}