http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java new file mode 100644 index 0000000..d954a46 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.subscription; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.BrokerPathConfigHelper; +import com.alibaba.rocketmq.common.ConfigManager; +import com.alibaba.rocketmq.common.DataVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class SubscriptionGroupManager extends ConfigManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = + new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); + private final DataVersion dataVersion = new DataVersion(); + private transient BrokerController brokerController; + + + public SubscriptionGroupManager() { + this.init(); + } + + private void init() { + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP); + this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP); + this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP); + this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig); + } + + { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig); + } + } + + + public SubscriptionGroupManager(BrokerController brokerController) { + this.brokerController = brokerController; + this.init(); + } + + + public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { + SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config); + if (old != null) { + log.info("update subscription group config, old: " + old + " new: " + config); + } else { + log.info("create new subscription group, " + config); + } + + this.dataVersion.nextVersion(); + + this.persist(); + } + + public void disableConsume(final String groupName) { + SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName); + if (old != null) { + old.setConsumeEnable(false); + this.dataVersion.nextVersion(); + } + } + + + public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { + SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group); + if (null == subscriptionGroupConfig) { + if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) { + subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(group); + SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig); + if (null == preConfig) { + log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString()); + } + this.dataVersion.nextVersion(); + this.persist(); + } + } + + return subscriptionGroupConfig; + } + + + @Override + public String encode() { + return this.encode(false); + } + + @Override + public String configFilePath() { + //return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + return BrokerPathConfigHelper.getSubscriptionGroupPath(System.getProperty("user.home") + File.separator + "store"); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class); + if (obj != null) { + this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable); + this.dataVersion.assignNewOne(obj.dataVersion); + this.printLoadDataWhenFirstBoot(obj); + } + } + } + + public String encode(final boolean prettyFormat) { + return RemotingSerializable.toJson(this, prettyFormat); + } + + private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) { + Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<String, SubscriptionGroupConfig> next = it.next(); + log.info("load exist subscription group, {}", next.getValue().toString()); + } + } + + public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { + return subscriptionGroupTable; + } + + + public DataVersion getDataVersion() { + return dataVersion; + } + + + public void deleteSubscriptionGroupConfig(final String groupName) { + SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName); + if (old != null) { + log.info("delete subscription group OK, subscription group: " + old); + this.dataVersion.nextVersion(); + this.persist(); + } else { + log.warn("delete subscription group failed, subscription group: " + old + " not exist"); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java new file mode 100644 index 0000000..94d7e9f --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.topic; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.BrokerPathConfigHelper; +import com.alibaba.rocketmq.common.ConfigManager; +import com.alibaba.rocketmq.common.DataVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.constant.PermName; +import com.alibaba.rocketmq.common.protocol.body.KVTable; +import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import com.alibaba.rocketmq.common.sysflag.TopicSysFlag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * @author shijia.wxr + */ +public class TopicConfigManager extends ConfigManager { + private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final long LOCK_TIMEOUT_MILLIS = 3000; + private transient final Lock lockTopicConfigTable = new ReentrantLock(); + + private final ConcurrentHashMap<String, TopicConfig> topicConfigTable = + new ConcurrentHashMap<String, TopicConfig>(1024); + private final DataVersion dataVersion = new DataVersion(); + private final Set<String> systemTopicList = new HashSet<String>(); + private transient BrokerController brokerController; + + + public TopicConfigManager() { + } + + + public TopicConfigManager(BrokerController brokerController) { + this.brokerController = brokerController; + { + // MixAll.SELF_TEST_TOPIC + String topic = MixAll.SELF_TEST_TOPIC; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(1); + topicConfig.setWriteQueueNums(1); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + { + // MixAll.DEFAULT_TOPIC + if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { + String topic = MixAll.DEFAULT_TOPIC; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() + .getDefaultTopicQueueNums()); + topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() + .getDefaultTopicQueueNums()); + int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; + topicConfig.setPerm(perm); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + } + { + // MixAll.BENCHMARK_TOPIC + String topic = MixAll.BENCHMARK_TOPIC; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(1024); + topicConfig.setWriteQueueNums(1024); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + { + + String topic = this.brokerController.getBrokerConfig().getBrokerClusterName(); + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + int perm = PermName.PERM_INHERIT; + if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) { + perm |= PermName.PERM_READ | PermName.PERM_WRITE; + } + topicConfig.setPerm(perm); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + { + + String topic = this.brokerController.getBrokerConfig().getBrokerName(); + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + int perm = PermName.PERM_INHERIT; + if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) { + perm |= PermName.PERM_READ | PermName.PERM_WRITE; + } + topicConfig.setReadQueueNums(1); + topicConfig.setWriteQueueNums(1); + topicConfig.setPerm(perm); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + { + // MixAll.OFFSET_MOVED_EVENT + String topic = MixAll.OFFSET_MOVED_EVENT; + TopicConfig topicConfig = new TopicConfig(topic); + this.systemTopicList.add(topic); + topicConfig.setReadQueueNums(1); + topicConfig.setWriteQueueNums(1); + this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + } + + + public boolean isSystemTopic(final String topic) { + return this.systemTopicList.contains(topic); + } + + + public Set<String> getSystemTopic() { + return this.systemTopicList; + } + + + public boolean isTopicCanSendMessage(final String topic) { + return !topic.equals(MixAll.DEFAULT_TOPIC); + } + + + public TopicConfig selectTopicConfig(final String topic) { + return this.topicConfigTable.get(topic); + } + + + public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, + final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) { + TopicConfig topicConfig = null; + boolean createNew = false; + + try { + if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) + return topicConfig; + + TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic); + if (defaultTopicConfig != null) { + if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) { + if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { + defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + } + } + + if (PermName.isInherited(defaultTopicConfig.getPerm())) { + topicConfig = new TopicConfig(topic); + + int queueNums = + clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig + .getWriteQueueNums() : clientDefaultTopicQueueNums; + + if (queueNums < 0) { + queueNums = 0; + } + + topicConfig.setReadQueueNums(queueNums); + topicConfig.setWriteQueueNums(queueNums); + int perm = defaultTopicConfig.getPerm(); + perm &= ~PermName.PERM_INHERIT; + topicConfig.setPerm(perm); + topicConfig.setTopicSysFlag(topicSysFlag); + topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); + } else { + LOG.warn("create new topic failed, because the default topic[" + defaultTopic + + "] no perm, " + defaultTopicConfig.getPerm() + " producer: " + + remoteAddress); + } + } else { + LOG.warn("create new topic failed, because the default topic[" + defaultTopic + + "] not exist." + " producer: " + remoteAddress); + } + + if (topicConfig != null) { + LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig + + " producer: " + remoteAddress); + + this.topicConfigTable.put(topic, topicConfig); + + this.dataVersion.nextVersion(); + + createNew = true; + + this.persist(); + } + } finally { + this.lockTopicConfigTable.unlock(); + } + } + } catch (InterruptedException e) { + LOG.error("createTopicInSendMessageMethod exception", e); + } + + if (createNew) { + this.brokerController.registerBrokerAll(false, true); + } + + return topicConfig; + } + + public TopicConfig createTopicInSendMessageBackMethod( + final String topic, + final int clientDefaultTopicQueueNums, + final int perm, + final int topicSysFlag) { + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) + return topicConfig; + + boolean createNew = false; + + try { + if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) + return topicConfig; + + topicConfig = new TopicConfig(topic); + topicConfig.setReadQueueNums(clientDefaultTopicQueueNums); + topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums); + topicConfig.setPerm(perm); + topicConfig.setTopicSysFlag(topicSysFlag); + + LOG.info("create new topic {}", topicConfig); + this.topicConfigTable.put(topic, topicConfig); + createNew = true; + this.dataVersion.nextVersion(); + this.persist(); + } finally { + this.lockTopicConfigTable.unlock(); + } + } + } catch (InterruptedException e) { + LOG.error("createTopicInSendMessageBackMethod exception", e); + } + + if (createNew) { + this.brokerController.registerBrokerAll(false, true); + } + + return topicConfig; + } + + public void updateTopicUnitFlag(final String topic, final boolean unit) { + + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) { + int oldTopicSysFlag = topicConfig.getTopicSysFlag(); + if (unit) { + topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag)); + } else { + topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag)); + } + + LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, + topicConfig.getTopicSysFlag()); + + this.topicConfigTable.put(topic, topicConfig); + + this.dataVersion.nextVersion(); + + this.persist(); + this.brokerController.registerBrokerAll(false, true); + } + } + + public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) { + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null) { + int oldTopicSysFlag = topicConfig.getTopicSysFlag(); + if (hasUnitSub) { + topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag)); + } + + LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, + topicConfig.getTopicSysFlag()); + + this.topicConfigTable.put(topic, topicConfig); + + this.dataVersion.nextVersion(); + + this.persist(); + this.brokerController.registerBrokerAll(false, true); + } + } + + public void updateTopicConfig(final TopicConfig topicConfig) { + TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + if (old != null) { + LOG.info("update topic config, old: " + old + " new: " + topicConfig); + } else { + LOG.info("create new topic, " + topicConfig); + } + + this.dataVersion.nextVersion(); + + this.persist(); + } + + + public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { + + if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) { + boolean isChange = false; + Set<String> orderTopics = orderKVTableFromNs.getTable().keySet(); + for (String topic : orderTopics) { + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig != null && !topicConfig.isOrder()) { + topicConfig.setOrder(true); + isChange = true; + LOG.info("update order topic config, topic={}, order={}", topic, true); + } + } + + for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) { + String topic = entry.getKey(); + if (!orderTopics.contains(topic)) { + TopicConfig topicConfig = entry.getValue(); + if (topicConfig.isOrder()) { + topicConfig.setOrder(false); + isChange = true; + LOG.info("update order topic config, topic={}, order={}", topic, false); + } + } + } + + if (isChange) { + this.dataVersion.nextVersion(); + this.persist(); + } + } + } + + public boolean isOrderTopic(final String topic) { + TopicConfig topicConfig = this.topicConfigTable.get(topic); + if (topicConfig == null) { + return false; + } else { + return topicConfig.isOrder(); + } + } + + public void deleteTopicConfig(final String topic) { + TopicConfig old = this.topicConfigTable.remove(topic); + if (old != null) { + LOG.info("delete topic config OK, topic: " + old); + this.dataVersion.nextVersion(); + this.persist(); + } else { + LOG.warn("delete topic config failed, topic: " + topic + " not exist"); + } + } + + public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable); + topicConfigSerializeWrapper.setDataVersion(this.dataVersion); + return topicConfigSerializeWrapper; + } + + @Override + public String encode() { + return encode(false); + } + + @Override + public String configFilePath() { +// return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig() +// .getStorePathRootDir()); + return BrokerPathConfigHelper.getTopicConfigPath(System.getProperty("user.home") + File.separator + "store"); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = + TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class); + if (topicConfigSerializeWrapper != null) { + this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable()); + this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion()); + this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper); + } + } + } + + public String encode(final boolean prettyFormat) { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable); + topicConfigSerializeWrapper.setDataVersion(this.dataVersion); + return topicConfigSerializeWrapper.toJson(prettyFormat); + } + + private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) { + Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<String, TopicConfig> next = it.next(); + LOG.info("load exist local topic, {}", next.getValue().toString()); + } + } + + public DataVersion getDataVersion() { + return dataVersion; + } + + public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() { + return topicConfigTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java new file mode 100644 index 0000000..4328cf8 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker.transaction; + +public class TransactionRecord { + // Commit Log Offset + private long offset; + private String producerGroup; + + + public long getOffset() { + return offset; + } + + + public void setOffset(long offset) { + this.offset = offset; + } + + + public String getProducerGroup() { + return producerGroup; + } + + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java new file mode 100644 index 0000000..9d977ab --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker.transaction; + +import java.util.List; + + +public interface TransactionStore { + public boolean open(); + + + public void close(); + + + public boolean put(final List<TransactionRecord> trs); + + + public void remove(final List<Long> pks); + + + public List<TransactionRecord> traverse(final long pk, final int nums); + + + public long totalRecords(); + + + public long minPK(); + + + public long maxPK(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java new file mode 100644 index 0000000..47de33b --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker.transaction.jdbc; + +import com.alibaba.rocketmq.broker.transaction.TransactionRecord; +import com.alibaba.rocketmq.broker.transaction.TransactionStore; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.sql.*; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + + +public class JDBCTransactionStore implements TransactionStore { + private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); + private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig; + private Connection connection; + private AtomicLong totalRecordsValue = new AtomicLong(0); + + public JDBCTransactionStore(JDBCTransactionStoreConfig jdbcTransactionStoreConfig) { + this.jdbcTransactionStoreConfig = jdbcTransactionStoreConfig; + } + + @Override + public boolean open() { + if (this.loadDriver()) { + Properties props = new Properties(); + props.put("user", jdbcTransactionStoreConfig.getJdbcUser()); + props.put("password", jdbcTransactionStoreConfig.getJdbcPassword()); + + try { + this.connection = + DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props); + + this.connection.setAutoCommit(false); + + + if (!this.computeTotalRecords()) { + return this.createDB(); + } + + return true; + } catch (SQLException e) { + log.info("Create JDBC Connection Exeption", e); + } + } + + return false; + } + + private boolean loadDriver() { + try { + Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance(); + log.info("Loaded the appropriate driver, {}", + this.jdbcTransactionStoreConfig.getJdbcDriverClass()); + return true; + } catch (Exception e) { + log.info("Loaded the appropriate driver Exception", e); + } + + return false; + } + + private boolean computeTotalRecords() { + Statement statement = null; + ResultSet resultSet = null; + try { + statement = this.connection.createStatement(); + + resultSet = statement.executeQuery("select count(offset) as total from t_transaction"); + if (!resultSet.next()) { + log.warn("computeTotalRecords ResultSet is empty"); + return false; + } + + this.totalRecordsValue.set(resultSet.getLong(1)); + } catch (Exception e) { + log.warn("computeTotalRecords Exception", e); + return false; + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + } + } + + if (null != resultSet) { + try { + resultSet.close(); + } catch (SQLException e) { + } + } + } + + return true; + } + + private boolean createDB() { + Statement statement = null; + try { + statement = this.connection.createStatement(); + + String sql = this.createTableSql(); + log.info("createDB SQL:\n {}", sql); + statement.execute(sql); + this.connection.commit(); + return true; + } catch (Exception e) { + log.warn("createDB Exception", e); + return false; + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + log.warn("Close statement exception", e); + } + } + } + } + + private String createTableSql() { + URL resource = JDBCTransactionStore.class.getClassLoader().getResource("transaction.sql"); + String fileContent = MixAll.file2String(resource); + return fileContent; + } + + @Override + public void close() { + try { + if (this.connection != null) { + this.connection.close(); + } + } catch (SQLException e) { + } + } + + @Override + public boolean put(List<TransactionRecord> trs) { + PreparedStatement statement = null; + try { + this.connection.setAutoCommit(false); + statement = this.connection.prepareStatement("insert into t_transaction values (?, ?)"); + for (TransactionRecord tr : trs) { + statement.setLong(1, tr.getOffset()); + statement.setString(2, tr.getProducerGroup()); + statement.addBatch(); + } + int[] executeBatch = statement.executeBatch(); + this.connection.commit(); + this.totalRecordsValue.addAndGet(updatedRows(executeBatch)); + return true; + } catch (Exception e) { + log.warn("createDB Exception", e); + return false; + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + log.warn("Close statement exception", e); + } + } + } + } + + private long updatedRows(int[] rows) { + long res = 0; + for (int i : rows) { + res += i; + } + + return res; + } + + @Override + public void remove(List<Long> pks) { + PreparedStatement statement = null; + try { + this.connection.setAutoCommit(false); + statement = this.connection.prepareStatement("DELETE FROM t_transaction WHERE offset = ?"); + for (long pk : pks) { + statement.setLong(1, pk); + statement.addBatch(); + } + int[] executeBatch = statement.executeBatch(); + this.connection.commit(); + } catch (Exception e) { + log.warn("createDB Exception", e); + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + } + } + } + } + + @Override + public List<TransactionRecord> traverse(long pk, int nums) { + return null; + } + + @Override + public long totalRecords() { + return this.totalRecordsValue.get(); + } + + @Override + public long minPK() { + return 0; + } + + @Override + public long maxPK() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java new file mode 100644 index 0000000..1244cfc --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker.transaction.jdbc; + +public class JDBCTransactionStoreConfig { + private String jdbcDriverClass = "com.mysql.jdbc.Driver"; + private String jdbcURL = "jdbc:mysql://xxx.xxx.xxx.xxx:1000/xxx?useUnicode=true&characterEncoding=UTF-8"; + private String jdbcUser = "xxx"; + private String jdbcPassword = "xxx"; + + + public String getJdbcDriverClass() { + return jdbcDriverClass; + } + + + public void setJdbcDriverClass(String jdbcDriverClass) { + this.jdbcDriverClass = jdbcDriverClass; + } + + + public String getJdbcURL() { + return jdbcURL; + } + + + public void setJdbcURL(String jdbcURL) { + this.jdbcURL = jdbcURL; + } + + + public String getJdbcUser() { + return jdbcUser; + } + + + public void setJdbcUser(String jdbcUser) { + this.jdbcUser = jdbcUser; + } + + + public String getJdbcPassword() { + return jdbcPassword; + } + + + public void setJdbcPassword(String jdbcPassword) { + this.jdbcPassword = jdbcPassword; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/resources/transaction.sql ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/resources/transaction.sql b/rocketmq-broker/src/main/resources/transaction.sql new file mode 100644 index 0000000..aaefe43 --- /dev/null +++ b/rocketmq-broker/src/main/resources/transaction.sql @@ -0,0 +1,4 @@ +CREATE TABLE t_transaction( + offset NUMERIC(20) PRIMARY KEY, + producerGroup VARCHAR(64) +) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java new file mode 100644 index 0000000..34ebfa5 --- /dev/null +++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.broker.api; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.client.hook.SendMessageContext; +import com.alibaba.rocketmq.client.impl.CommunicationMode; +import com.alibaba.rocketmq.client.impl.MQClientAPIImpl; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.common.BrokerConfig; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; +import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; +import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import org.junit.Test; + + +/** + * @author shijia.wxr + */ +public class SendMessageTest { + @Test + public void test_sendMessage() throws Exception { + BrokerController brokerController = new BrokerController(// + new BrokerConfig(), // + new NettyServerConfig(), // + new NettyClientConfig(), // + new MessageStoreConfig()); + boolean initResult = brokerController.initialize(); + System.out.println("initialize " + initResult); + + brokerController.start(); + + MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, null); + client.start(); + + for (int i = 0; i < 100; i++) { + String topic = "UnitTestTopic_" + i % 3; + Message msg = new Message(topic, "TAG1 TAG2", "100200300", ("Hello, Nice world\t" + i).getBytes()); + msg.setDelayTimeLevel(i % 3 + 1); + + try { + SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); + requestHeader.setProducerGroup("abc"); + requestHeader.setTopic(msg.getTopic()); + requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC); + requestHeader.setDefaultTopicQueueNums(4); + requestHeader.setQueueId(i % 4); + requestHeader.setSysFlag(0); + requestHeader.setBornTimestamp(System.currentTimeMillis()); + requestHeader.setFlag(msg.getFlag()); + requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); + + SendResult result = client.sendMessage("127.0.0.1:10911", "brokerName", msg, requestHeader, 1000 * 5, + CommunicationMode.SYNC, new SendMessageContext(), null); + System.out.println(i + "\t" + result); + } catch (Exception e) { + e.printStackTrace(); + } + } + + client.shutdown(); + + brokerController.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java new file mode 100644 index 0000000..55844eb --- /dev/null +++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.broker.offset; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.common.BrokerConfig; +import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; +import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import org.junit.Test; + +import java.util.Random; + + +/** + * @author shijia.wxr + */ +public class ConsumerOffsetManagerTest { + @Test + public void test_flushConsumerOffset() throws Exception { + BrokerController brokerController = new BrokerController(// + new BrokerConfig(), // + new NettyServerConfig(), // + new NettyClientConfig(), // + new MessageStoreConfig()); + boolean initResult = brokerController.initialize(); + System.out.println("initialize " + initResult); + brokerController.start(); + + ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController); + + Random random = new Random(); + + for (int i = 0; i < 100; i++) { + String group = "DIANPU_GROUP_" + i; + for (int id = 0; id < 16; id++) { + consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, + random.nextLong() % 1024 * 1024 * 1024); + consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, + random.nextLong() % 1024 * 1024 * 1024); + consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, + random.nextLong() % 1024 * 1024 * 1024); + } + } + + consumerOffsetManager.persist(); + + brokerController.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java new file mode 100644 index 0000000..9edd02e --- /dev/null +++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.broker.topic; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.common.BrokerConfig; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; +import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + + +/** + * @author shijia.wxr + */ +public class TopicConfigManagerTest { + @Test + public void test_flushTopicConfig() throws Exception { + BrokerController brokerController = new BrokerController(// + new BrokerConfig(), // + new NettyServerConfig(), // + new NettyClientConfig(), // + new MessageStoreConfig()); + boolean initResult = brokerController.initialize(); + System.out.println("initialize " + initResult); + brokerController.start(); + + TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController); + + TopicConfig topicConfig = + topicConfigManager.createTopicInSendMessageMethod("TestTopic_SEND", MixAll.DEFAULT_TOPIC, + null, 4, 0); + assertTrue(topicConfig != null); + + System.out.println(topicConfig); + + for (int i = 0; i < 10; i++) { + String topic = "UNITTEST-" + i; + topicConfig = + topicConfigManager + .createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0); + assertTrue(topicConfig != null); + } + + topicConfigManager.persist(); + + brokerController.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/deploy.bat ---------------------------------------------------------------------- diff --git a/rocketmq-client/deploy.bat b/rocketmq-client/deploy.bat new file mode 100644 index 0000000..f778070 --- /dev/null +++ b/rocketmq-client/deploy.bat @@ -0,0 +1 @@ +mvn -Dmaven.test.skip=true deploy -Pclient-shade \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/install.bat ---------------------------------------------------------------------- diff --git a/rocketmq-client/install.bat b/rocketmq-client/install.bat new file mode 100644 index 0000000..87bf456 --- /dev/null +++ b/rocketmq-client/install.bat @@ -0,0 +1,2 @@ +mvn -Dmaven.test.skip=true clean package install -Pclient-shade -U + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-client/pom.xml b/rocketmq-client/pom.xml new file mode 100644 index 0000000..63a6114 --- /dev/null +++ b/rocketmq-client/pom.xml @@ -0,0 +1,97 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-all</artifactId> + <version>4.0.0-SNAPSHOT</version> + </parent> + + + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>rocketmq-client</artifactId> + <name>rocketmq-client ${project.version}</name> + + <profiles> + <profile> + <id>client-shade</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.3</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + <configuration> + <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope> + <promoteTransitiveDependencies>false</promoteTransitiveDependencies> + <createDependencyReducedPom>true</createDependencyReducedPom> + <minimizeJar>false</minimizeJar> + <createSourcesJar>true</createSourcesJar> + <artifactSet> + <includes> + <include>com.alibaba:fastjson</include> + <include>io.netty:netty-all</include> + <include>com.alibaba.rocketmq:rocketmq-client</include> + <include>com.alibaba.rocketmq:rocketmq-common</include> + <include>com.alibaba.rocketmq:rocketmq-remoting</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>io.netty</pattern> + <shadedPattern>com.alibaba.rocketmq.shade.io.netty</shadedPattern> + </relocation> + <relocation> + <pattern>com.alibaba.fastjson</pattern> + <shadedPattern>com.alibaba.rocketmq.shade.com.alibaba.fastjson</shadedPattern> + </relocation> + </relocations> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-common</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java new file mode 100644 index 0000000..4d80564 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client; + +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; + + +/** + * Client Common configuration + * + * @author shijia.wxr + * @author vongosling + */ +public class ClientConfig { + public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; + private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); + private String clientIP = RemotingUtil.getLocalAddress(); + private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); + private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); + /** + * Pulling topic information interval from the named server + */ + private int pollNameServerInteval = 1000 * 30; + /** + * Heartbeat interval in microseconds with message broker + */ + private int heartbeatBrokerInterval = 1000 * 30; + /** + * Offset persistent interval for consumer + */ + private int persistConsumerOffsetInterval = 1000 * 5; + private boolean unitMode = false; + private String unitName; + private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); + + public String buildMQClientId() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getClientIP()); + + sb.append("@"); + sb.append(this.getInstanceName()); + if (!UtilAll.isBlank(this.unitName)) { + sb.append("@"); + sb.append(this.unitName); + } + + return sb.toString(); + } + + public String getClientIP() { + return clientIP; + } + + public void setClientIP(String clientIP) { + this.clientIP = clientIP; + } + + public String getInstanceName() { + return instanceName; + } + + public void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + public void changeInstanceNameToPID() { + if (this.instanceName.equals("DEFAULT")) { + this.instanceName = String.valueOf(UtilAll.getPid()); + } + } + + public void resetClientConfig(final ClientConfig cc) { + this.namesrvAddr = cc.namesrvAddr; + this.clientIP = cc.clientIP; + this.instanceName = cc.instanceName; + this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads; + this.pollNameServerInteval = cc.pollNameServerInteval; + this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval; + this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval; + this.unitMode = cc.unitMode; + this.unitName = cc.unitName; + this.vipChannelEnabled = cc.vipChannelEnabled; + } + + public ClientConfig cloneClientConfig() { + ClientConfig cc = new ClientConfig(); + cc.namesrvAddr = namesrvAddr; + cc.clientIP = clientIP; + cc.instanceName = instanceName; + cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads; + cc.pollNameServerInteval = pollNameServerInteval; + cc.heartbeatBrokerInterval = heartbeatBrokerInterval; + cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval; + cc.unitMode = unitMode; + cc.unitName = unitName; + cc.vipChannelEnabled = vipChannelEnabled; + return cc; + } + + public String getNamesrvAddr() { + return namesrvAddr; + } + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + public int getClientCallbackExecutorThreads() { + return clientCallbackExecutorThreads; + } + + + public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) { + this.clientCallbackExecutorThreads = clientCallbackExecutorThreads; + } + + + public int getPollNameServerInteval() { + return pollNameServerInteval; + } + + + public void setPollNameServerInteval(int pollNameServerInteval) { + this.pollNameServerInteval = pollNameServerInteval; + } + + + public int getHeartbeatBrokerInterval() { + return heartbeatBrokerInterval; + } + + + public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) { + this.heartbeatBrokerInterval = heartbeatBrokerInterval; + } + + + public int getPersistConsumerOffsetInterval() { + return persistConsumerOffsetInterval; + } + + + public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) { + this.persistConsumerOffsetInterval = persistConsumerOffsetInterval; + } + + + public String getUnitName() { + return unitName; + } + + + public void setUnitName(String unitName) { + this.unitName = unitName; + } + + + public boolean isUnitMode() { + return unitMode; + } + + + public void setUnitMode(boolean unitMode) { + this.unitMode = unitMode; + } + + + public boolean isVipChannelEnabled() { + return vipChannelEnabled; + } + + + public void setVipChannelEnabled(final boolean vipChannelEnabled) { + this.vipChannelEnabled = vipChannelEnabled; + } + + + @Override + public String toString() { + return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName + + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval + + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" + + vipChannelEnabled + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java new file mode 100644 index 0000000..4e202e9 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.exception.RemotingException; + + +/** + * Base interface for MQ management + * + * @author shijia.wxr + */ +public interface MQAdmin { + /** + * Creates an topic + * + * @param key + * accesskey + * @param newTopic + * topic name + * @param queueNum + * topic's queue number + * + * @throws MQClientException + */ + void createTopic(final String key, final String newTopic, final int queueNum) + throws MQClientException; + + + /** + * Creates an topic + * + * @param key + * accesskey + * @param newTopic + * topic name + * @param queueNum + * topic's queue number + * @param topicSysFlag + * topic system flag + * + * @throws MQClientException + */ + void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) + throws MQClientException; + + + /** + * Gets the message queue offset according to some time in milliseconds<br> + * be cautious to call because of more IO overhead + * + * @param mq + * Instance of MessageQueue + * @param timestamp + * from when in milliseconds. + * + * @return offset + * + * @throws MQClientException + */ + long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; + + + /** + * Gets the max offset + * + * @param mq + * Instance of MessageQueue + * + * @return the max offset + * + * @throws MQClientException + */ + long maxOffset(final MessageQueue mq) throws MQClientException; + + + /** + * Gets the minimum offset + * + * @param mq + * Instance of MessageQueue + * + * @return the minimum offset + * + * @throws MQClientException + */ + long minOffset(final MessageQueue mq) throws MQClientException; + + + /** + * Gets the earliest stored message time + * + * @param mq + * Instance of MessageQueue + * + * @return the time in microseconds + * + * @throws MQClientException + */ + long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException; + + + /** + * Query message according tto message id + * + * @param offsetMsgId + * message id + * + * @return message + * + * @throws InterruptedException + * @throws MQBrokerException + * @throws RemotingException + * @throws MQClientException + */ + MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + + /** + * Query messages + * + * @param topic + * message topic + * @param key + * message key index word + * @param maxNum + * max message number + * @param begin + * from when + * @param end + * to when + * + * @return Instance of QueryResult + * + * @throws MQClientException + * @throws InterruptedException + */ + QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, + final long end) throws MQClientException, InterruptedException; + + /** + + * @param topic + * @param msgId + * @return The {@code MessageExt} of given msgId + * @throws RemotingException + * @throws MQBrokerException + * @throws InterruptedException + * @throws MQClientException + */ + MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java new file mode 100644 index 0000000..5934b49 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import org.slf4j.Logger; + +import java.util.Set; +import java.util.TreeSet; + + +/** + * @author shijia.wxr + */ +public class MQHelper { + public static void resetOffsetByTimestamp( + final MessageModel messageModel, + final String consumerGroup, + final String topic, + final long timestamp) throws Exception { + resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp); + } + + /** + * Reset consumer topic offset according to time + * + * @param messageModel + * which model + * @param instanceName + * which instance + * @param consumerGroup + * consumer group + * @param topic + * topic + * @param timestamp + * time + * + * @throws Exception + */ + public static void resetOffsetByTimestamp( + final MessageModel messageModel, + final String instanceName, + final String consumerGroup, + final String topic, + final long timestamp) throws Exception { + final Logger log = ClientLogger.getLog(); + + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup); + consumer.setInstanceName(instanceName); + consumer.setMessageModel(messageModel); + consumer.start(); + + Set<MessageQueue> mqs = null; + try { + mqs = consumer.fetchSubscribeMessageQueues(topic); + if (mqs != null && !mqs.isEmpty()) { + TreeSet<MessageQueue> mqsNew = new TreeSet<MessageQueue>(mqs); + for (MessageQueue mq : mqsNew) { + long offset = consumer.searchOffset(mq, timestamp); + if (offset >= 0) { + consumer.updateConsumeOffset(mq, offset); + log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}", + consumerGroup, offset, mq); + } + } + } + } catch (Exception e) { + log.warn("resetOffsetByTimestamp Exception", e); + throw e; + } finally { + if (mqs != null) { + consumer.getDefaultMQPullConsumerImpl().getOffsetStore().persistAll(mqs); + } + consumer.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java new file mode 100644 index 0000000..43c8106 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client; + +import com.alibaba.rocketmq.common.message.MessageExt; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class QueryResult { + private final long indexLastUpdateTimestamp; + private final List<MessageExt> messageList; + + + public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) { + this.indexLastUpdateTimestamp = indexLastUpdateTimestamp; + this.messageList = messageList; + } + + + public long getIndexLastUpdateTimestamp() { + return indexLastUpdateTimestamp; + } + + + public List<MessageExt> getMessageList() { + return messageList; + } + + + @Override + public String toString() { + return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList=" + + messageList + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java new file mode 100644 index 0000000..203aae0 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/Validators.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.client; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.producer.DefaultMQProducer; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.protocol.ResponseCode; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * Common Validator + * + * @author manhong.yqd + */ +public class Validators { + public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; + public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); + public static final int CHARACTER_MAX_LENGTH = 255; + + /** + * @param origin + * @param patternStr + * + * @return The resulting {@code String} + */ + public static String getGroupWithRegularExpression(String origin, String patternStr) { + Pattern pattern = Pattern.compile(patternStr); + Matcher matcher = pattern.matcher(origin); + while (matcher.find()) { + return matcher.group(0); + } + return null; + } + + /** + * Validate group + * + * @param group + * + * @throws com.alibaba.rocketmq.client.exception.MQClientException + */ + public static void checkGroup(String group) throws MQClientException { + if (UtilAll.isBlank(group)) { + throw new MQClientException("the specified group is blank", null); + } + if (!regularExpressionMatcher(group, PATTERN)) { + throw new MQClientException(String.format( + "the specified group[%s] contains illegal characters, allowing only %s", group, + VALID_PATTERN_STR), null); + } + if (group.length() > CHARACTER_MAX_LENGTH) { + throw new MQClientException("the specified group is longer than group max length 255.", null); + } + } + + /** + * @param origin + * @param pattern + * + * @return <tt>true</tt> if, and only if, the entire origin sequence + * matches this matcher's pattern + */ + public static boolean regularExpressionMatcher(String origin, Pattern pattern) { + if (pattern == null) { + return true; + } + Matcher matcher = pattern.matcher(origin); + return matcher.matches(); + } + + /** + * Validate message + * + * @param msg + * @param defaultMQProducer + * + * @throws com.alibaba.rocketmq.client.exception.MQClientException + */ + public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) + throws MQClientException { + if (null == msg) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); + } + // topic + Validators.checkTopic(msg.getTopic()); + // body + if (null == msg.getBody()) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); + } + + if (0 == msg.getBody().length) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); + } + + if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, + "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); + } + } + + /** + * Validate topic + * + * @param topic + * + * @throws com.alibaba.rocketmq.client.exception.MQClientException + */ + public static void checkTopic(String topic) throws MQClientException { + if (UtilAll.isBlank(topic)) { + throw new MQClientException("the specified topic is blank", null); + } + + if (!regularExpressionMatcher(topic, PATTERN)) { + throw new MQClientException(String.format( + "the specified topic[%s] contains illegal characters, allowing only %s", topic, + VALID_PATTERN_STR), null); + } + + if (topic.length() > CHARACTER_MAX_LENGTH) { + throw new MQClientException("the specified topic is longer than topic max length 255.", null); + } + + //whether the same with system reserved keyword + if (topic.equals(MixAll.DEFAULT_TOPIC)) { + throw new MQClientException( + String.format("the topic[%s] is conflict with default topic.", topic), null); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java new file mode 100644 index 0000000..071a872 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.admin; + +/** + * @author shijia.wxr + */ +public interface MQAdminExtInner { + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java new file mode 100644 index 0000000..88d0eea --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.client.common; + +public class ClientErrorCode { + public static final int CONNECT_BROKER_EXCEPTION = 10001; + public static final int ACCESS_BROKER_TIMEOUT = 10002; + public static final int BROKER_NOT_EXIST_EXCEPTION = 10003; + public static final int NO_NAME_SERVER_EXCEPTION = 10004; + public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005; +} \ No newline at end of file
