http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java deleted file mode 100644 index d954a46..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.subscription; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.BrokerPathConfigHelper; -import com.alibaba.rocketmq.common.ConfigManager; -import com.alibaba.rocketmq.common.DataVersion; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - */ -public class SubscriptionGroupManager extends ConfigManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - - private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = - new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); - private final DataVersion dataVersion = new DataVersion(); - private transient BrokerController brokerController; - - - public SubscriptionGroupManager() { - this.init(); - } - - private void init() { - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP); - subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP); - subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP); - subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig); - } - - { - SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP); - subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig); - } - } - - - public SubscriptionGroupManager(BrokerController brokerController) { - this.brokerController = brokerController; - this.init(); - } - - - public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { - SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config); - if (old != null) { - log.info("update subscription group config, old: " + old + " new: " + config); - } else { - log.info("create new subscription group, " + config); - } - - this.dataVersion.nextVersion(); - - this.persist(); - } - - public void disableConsume(final String groupName) { - SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName); - if (old != null) { - old.setConsumeEnable(false); - this.dataVersion.nextVersion(); - } - } - - - public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { - SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group); - if (null == subscriptionGroupConfig) { - if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) { - subscriptionGroupConfig = new SubscriptionGroupConfig(); - subscriptionGroupConfig.setGroupName(group); - SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig); - if (null == preConfig) { - log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString()); - } - this.dataVersion.nextVersion(); - this.persist(); - } - } - - return subscriptionGroupConfig; - } - - - @Override - public String encode() { - return this.encode(false); - } - - @Override - public String configFilePath() { - //return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); - return BrokerPathConfigHelper.getSubscriptionGroupPath(System.getProperty("user.home") + File.separator + "store"); - } - - @Override - public void decode(String jsonString) { - if (jsonString != null) { - SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class); - if (obj != null) { - this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable); - this.dataVersion.assignNewOne(obj.dataVersion); - this.printLoadDataWhenFirstBoot(obj); - } - } - } - - public String encode(final boolean prettyFormat) { - return RemotingSerializable.toJson(this, prettyFormat); - } - - private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) { - Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator(); - while (it.hasNext()) { - Entry<String, SubscriptionGroupConfig> next = it.next(); - log.info("load exist subscription group, {}", next.getValue().toString()); - } - } - - public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { - return subscriptionGroupTable; - } - - - public DataVersion getDataVersion() { - return dataVersion; - } - - - public void deleteSubscriptionGroupConfig(final String groupName) { - SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName); - if (old != null) { - log.info("delete subscription group OK, subscription group: " + old); - this.dataVersion.nextVersion(); - this.persist(); - } else { - log.warn("delete subscription group failed, subscription group: " + old + " not exist"); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java deleted file mode 100644 index 94d7e9f..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java +++ /dev/null @@ -1,440 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.topic; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.BrokerPathConfigHelper; -import com.alibaba.rocketmq.common.ConfigManager; -import com.alibaba.rocketmq.common.DataVersion; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.TopicConfig; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.constant.PermName; -import com.alibaba.rocketmq.common.protocol.body.KVTable; -import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import com.alibaba.rocketmq.common.sysflag.TopicSysFlag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -/** - * @author shijia.wxr - */ -public class TopicConfigManager extends ConfigManager { - private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final long LOCK_TIMEOUT_MILLIS = 3000; - private transient final Lock lockTopicConfigTable = new ReentrantLock(); - - private final ConcurrentHashMap<String, TopicConfig> topicConfigTable = - new ConcurrentHashMap<String, TopicConfig>(1024); - private final DataVersion dataVersion = new DataVersion(); - private final Set<String> systemTopicList = new HashSet<String>(); - private transient BrokerController brokerController; - - - public TopicConfigManager() { - } - - - public TopicConfigManager(BrokerController brokerController) { - this.brokerController = brokerController; - { - // MixAll.SELF_TEST_TOPIC - String topic = MixAll.SELF_TEST_TOPIC; - TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); - topicConfig.setReadQueueNums(1); - topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); - } - { - // MixAll.DEFAULT_TOPIC - if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { - String topic = MixAll.DEFAULT_TOPIC; - TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); - topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() - .getDefaultTopicQueueNums()); - topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() - .getDefaultTopicQueueNums()); - int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; - topicConfig.setPerm(perm); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); - } - } - { - // MixAll.BENCHMARK_TOPIC - String topic = MixAll.BENCHMARK_TOPIC; - TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); - topicConfig.setReadQueueNums(1024); - topicConfig.setWriteQueueNums(1024); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); - } - { - - String topic = this.brokerController.getBrokerConfig().getBrokerClusterName(); - TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); - int perm = PermName.PERM_INHERIT; - if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) { - perm |= PermName.PERM_READ | PermName.PERM_WRITE; - } - topicConfig.setPerm(perm); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); - } - { - - String topic = this.brokerController.getBrokerConfig().getBrokerName(); - TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); - int perm = PermName.PERM_INHERIT; - if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) { - perm |= PermName.PERM_READ | PermName.PERM_WRITE; - } - topicConfig.setReadQueueNums(1); - topicConfig.setWriteQueueNums(1); - topicConfig.setPerm(perm); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); - } - { - // MixAll.OFFSET_MOVED_EVENT - String topic = MixAll.OFFSET_MOVED_EVENT; - TopicConfig topicConfig = new TopicConfig(topic); - this.systemTopicList.add(topic); - topicConfig.setReadQueueNums(1); - topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); - } - } - - - public boolean isSystemTopic(final String topic) { - return this.systemTopicList.contains(topic); - } - - - public Set<String> getSystemTopic() { - return this.systemTopicList; - } - - - public boolean isTopicCanSendMessage(final String topic) { - return !topic.equals(MixAll.DEFAULT_TOPIC); - } - - - public TopicConfig selectTopicConfig(final String topic) { - return this.topicConfigTable.get(topic); - } - - - public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, - final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) { - TopicConfig topicConfig = null; - boolean createNew = false; - - try { - if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - topicConfig = this.topicConfigTable.get(topic); - if (topicConfig != null) - return topicConfig; - - TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic); - if (defaultTopicConfig != null) { - if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) { - if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { - defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); - } - } - - if (PermName.isInherited(defaultTopicConfig.getPerm())) { - topicConfig = new TopicConfig(topic); - - int queueNums = - clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig - .getWriteQueueNums() : clientDefaultTopicQueueNums; - - if (queueNums < 0) { - queueNums = 0; - } - - topicConfig.setReadQueueNums(queueNums); - topicConfig.setWriteQueueNums(queueNums); - int perm = defaultTopicConfig.getPerm(); - perm &= ~PermName.PERM_INHERIT; - topicConfig.setPerm(perm); - topicConfig.setTopicSysFlag(topicSysFlag); - topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); - } else { - LOG.warn("create new topic failed, because the default topic[" + defaultTopic - + "] no perm, " + defaultTopicConfig.getPerm() + " producer: " - + remoteAddress); - } - } else { - LOG.warn("create new topic failed, because the default topic[" + defaultTopic - + "] not exist." + " producer: " + remoteAddress); - } - - if (topicConfig != null) { - LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig - + " producer: " + remoteAddress); - - this.topicConfigTable.put(topic, topicConfig); - - this.dataVersion.nextVersion(); - - createNew = true; - - this.persist(); - } - } finally { - this.lockTopicConfigTable.unlock(); - } - } - } catch (InterruptedException e) { - LOG.error("createTopicInSendMessageMethod exception", e); - } - - if (createNew) { - this.brokerController.registerBrokerAll(false, true); - } - - return topicConfig; - } - - public TopicConfig createTopicInSendMessageBackMethod( - final String topic, - final int clientDefaultTopicQueueNums, - final int perm, - final int topicSysFlag) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); - if (topicConfig != null) - return topicConfig; - - boolean createNew = false; - - try { - if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - topicConfig = this.topicConfigTable.get(topic); - if (topicConfig != null) - return topicConfig; - - topicConfig = new TopicConfig(topic); - topicConfig.setReadQueueNums(clientDefaultTopicQueueNums); - topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums); - topicConfig.setPerm(perm); - topicConfig.setTopicSysFlag(topicSysFlag); - - LOG.info("create new topic {}", topicConfig); - this.topicConfigTable.put(topic, topicConfig); - createNew = true; - this.dataVersion.nextVersion(); - this.persist(); - } finally { - this.lockTopicConfigTable.unlock(); - } - } - } catch (InterruptedException e) { - LOG.error("createTopicInSendMessageBackMethod exception", e); - } - - if (createNew) { - this.brokerController.registerBrokerAll(false, true); - } - - return topicConfig; - } - - public void updateTopicUnitFlag(final String topic, final boolean unit) { - - TopicConfig topicConfig = this.topicConfigTable.get(topic); - if (topicConfig != null) { - int oldTopicSysFlag = topicConfig.getTopicSysFlag(); - if (unit) { - topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag)); - } else { - topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag)); - } - - LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, - topicConfig.getTopicSysFlag()); - - this.topicConfigTable.put(topic, topicConfig); - - this.dataVersion.nextVersion(); - - this.persist(); - this.brokerController.registerBrokerAll(false, true); - } - } - - public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); - if (topicConfig != null) { - int oldTopicSysFlag = topicConfig.getTopicSysFlag(); - if (hasUnitSub) { - topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag)); - } - - LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, - topicConfig.getTopicSysFlag()); - - this.topicConfigTable.put(topic, topicConfig); - - this.dataVersion.nextVersion(); - - this.persist(); - this.brokerController.registerBrokerAll(false, true); - } - } - - public void updateTopicConfig(final TopicConfig topicConfig) { - TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); - if (old != null) { - LOG.info("update topic config, old: " + old + " new: " + topicConfig); - } else { - LOG.info("create new topic, " + topicConfig); - } - - this.dataVersion.nextVersion(); - - this.persist(); - } - - - public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { - - if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) { - boolean isChange = false; - Set<String> orderTopics = orderKVTableFromNs.getTable().keySet(); - for (String topic : orderTopics) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); - if (topicConfig != null && !topicConfig.isOrder()) { - topicConfig.setOrder(true); - isChange = true; - LOG.info("update order topic config, topic={}, order={}", topic, true); - } - } - - for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) { - String topic = entry.getKey(); - if (!orderTopics.contains(topic)) { - TopicConfig topicConfig = entry.getValue(); - if (topicConfig.isOrder()) { - topicConfig.setOrder(false); - isChange = true; - LOG.info("update order topic config, topic={}, order={}", topic, false); - } - } - } - - if (isChange) { - this.dataVersion.nextVersion(); - this.persist(); - } - } - } - - public boolean isOrderTopic(final String topic) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); - if (topicConfig == null) { - return false; - } else { - return topicConfig.isOrder(); - } - } - - public void deleteTopicConfig(final String topic) { - TopicConfig old = this.topicConfigTable.remove(topic); - if (old != null) { - LOG.info("delete topic config OK, topic: " + old); - this.dataVersion.nextVersion(); - this.persist(); - } else { - LOG.warn("delete topic config failed, topic: " + topic + " not exist"); - } - } - - public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() { - TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); - topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable); - topicConfigSerializeWrapper.setDataVersion(this.dataVersion); - return topicConfigSerializeWrapper; - } - - @Override - public String encode() { - return encode(false); - } - - @Override - public String configFilePath() { -// return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig() -// .getStorePathRootDir()); - return BrokerPathConfigHelper.getTopicConfigPath(System.getProperty("user.home") + File.separator + "store"); - } - - @Override - public void decode(String jsonString) { - if (jsonString != null) { - TopicConfigSerializeWrapper topicConfigSerializeWrapper = - TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class); - if (topicConfigSerializeWrapper != null) { - this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable()); - this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion()); - this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper); - } - } - } - - public String encode(final boolean prettyFormat) { - TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); - topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable); - topicConfigSerializeWrapper.setDataVersion(this.dataVersion); - return topicConfigSerializeWrapper.toJson(prettyFormat); - } - - private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) { - Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator(); - while (it.hasNext()) { - Entry<String, TopicConfig> next = it.next(); - LOG.info("load exist local topic, {}", next.getValue().toString()); - } - } - - public DataVersion getDataVersion() { - return dataVersion; - } - - public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() { - return topicConfigTable; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java b/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java deleted file mode 100644 index 4328cf8..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker.transaction; - -public class TransactionRecord { - // Commit Log Offset - private long offset; - private String producerGroup; - - - public long getOffset() { - return offset; - } - - - public void setOffset(long offset) { - this.offset = offset; - } - - - public String getProducerGroup() { - return producerGroup; - } - - - public void setProducerGroup(String producerGroup) { - this.producerGroup = producerGroup; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java deleted file mode 100644 index 9d977ab..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker.transaction; - -import java.util.List; - - -public interface TransactionStore { - public boolean open(); - - - public void close(); - - - public boolean put(final List<TransactionRecord> trs); - - - public void remove(final List<Long> pks); - - - public List<TransactionRecord> traverse(final long pk, final int nums); - - - public long totalRecords(); - - - public long minPK(); - - - public long maxPK(); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java deleted file mode 100644 index 47de33b..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker.transaction.jdbc; - -import com.alibaba.rocketmq.broker.transaction.TransactionRecord; -import com.alibaba.rocketmq.broker.transaction.TransactionStore; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URL; -import java.sql.*; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicLong; - - -public class JDBCTransactionStore implements TransactionStore { - private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); - private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig; - private Connection connection; - private AtomicLong totalRecordsValue = new AtomicLong(0); - - public JDBCTransactionStore(JDBCTransactionStoreConfig jdbcTransactionStoreConfig) { - this.jdbcTransactionStoreConfig = jdbcTransactionStoreConfig; - } - - @Override - public boolean open() { - if (this.loadDriver()) { - Properties props = new Properties(); - props.put("user", jdbcTransactionStoreConfig.getJdbcUser()); - props.put("password", jdbcTransactionStoreConfig.getJdbcPassword()); - - try { - this.connection = - DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props); - - this.connection.setAutoCommit(false); - - - if (!this.computeTotalRecords()) { - return this.createDB(); - } - - return true; - } catch (SQLException e) { - log.info("Create JDBC Connection Exeption", e); - } - } - - return false; - } - - private boolean loadDriver() { - try { - Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance(); - log.info("Loaded the appropriate driver, {}", - this.jdbcTransactionStoreConfig.getJdbcDriverClass()); - return true; - } catch (Exception e) { - log.info("Loaded the appropriate driver Exception", e); - } - - return false; - } - - private boolean computeTotalRecords() { - Statement statement = null; - ResultSet resultSet = null; - try { - statement = this.connection.createStatement(); - - resultSet = statement.executeQuery("select count(offset) as total from t_transaction"); - if (!resultSet.next()) { - log.warn("computeTotalRecords ResultSet is empty"); - return false; - } - - this.totalRecordsValue.set(resultSet.getLong(1)); - } catch (Exception e) { - log.warn("computeTotalRecords Exception", e); - return false; - } finally { - if (null != statement) { - try { - statement.close(); - } catch (SQLException e) { - } - } - - if (null != resultSet) { - try { - resultSet.close(); - } catch (SQLException e) { - } - } - } - - return true; - } - - private boolean createDB() { - Statement statement = null; - try { - statement = this.connection.createStatement(); - - String sql = this.createTableSql(); - log.info("createDB SQL:\n {}", sql); - statement.execute(sql); - this.connection.commit(); - return true; - } catch (Exception e) { - log.warn("createDB Exception", e); - return false; - } finally { - if (null != statement) { - try { - statement.close(); - } catch (SQLException e) { - log.warn("Close statement exception", e); - } - } - } - } - - private String createTableSql() { - URL resource = JDBCTransactionStore.class.getClassLoader().getResource("transaction.sql"); - String fileContent = MixAll.file2String(resource); - return fileContent; - } - - @Override - public void close() { - try { - if (this.connection != null) { - this.connection.close(); - } - } catch (SQLException e) { - } - } - - @Override - public boolean put(List<TransactionRecord> trs) { - PreparedStatement statement = null; - try { - this.connection.setAutoCommit(false); - statement = this.connection.prepareStatement("insert into t_transaction values (?, ?)"); - for (TransactionRecord tr : trs) { - statement.setLong(1, tr.getOffset()); - statement.setString(2, tr.getProducerGroup()); - statement.addBatch(); - } - int[] executeBatch = statement.executeBatch(); - this.connection.commit(); - this.totalRecordsValue.addAndGet(updatedRows(executeBatch)); - return true; - } catch (Exception e) { - log.warn("createDB Exception", e); - return false; - } finally { - if (null != statement) { - try { - statement.close(); - } catch (SQLException e) { - log.warn("Close statement exception", e); - } - } - } - } - - private long updatedRows(int[] rows) { - long res = 0; - for (int i : rows) { - res += i; - } - - return res; - } - - @Override - public void remove(List<Long> pks) { - PreparedStatement statement = null; - try { - this.connection.setAutoCommit(false); - statement = this.connection.prepareStatement("DELETE FROM t_transaction WHERE offset = ?"); - for (long pk : pks) { - statement.setLong(1, pk); - statement.addBatch(); - } - int[] executeBatch = statement.executeBatch(); - this.connection.commit(); - } catch (Exception e) { - log.warn("createDB Exception", e); - } finally { - if (null != statement) { - try { - statement.close(); - } catch (SQLException e) { - } - } - } - } - - @Override - public List<TransactionRecord> traverse(long pk, int nums) { - return null; - } - - @Override - public long totalRecords() { - return this.totalRecordsValue.get(); - } - - @Override - public long minPK() { - return 0; - } - - @Override - public long maxPK() { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java deleted file mode 100644 index 1244cfc..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker.transaction.jdbc; - -public class JDBCTransactionStoreConfig { - private String jdbcDriverClass = "com.mysql.jdbc.Driver"; - private String jdbcURL = "jdbc:mysql://xxx.xxx.xxx.xxx:1000/xxx?useUnicode=true&characterEncoding=UTF-8"; - private String jdbcUser = "xxx"; - private String jdbcPassword = "xxx"; - - - public String getJdbcDriverClass() { - return jdbcDriverClass; - } - - - public void setJdbcDriverClass(String jdbcDriverClass) { - this.jdbcDriverClass = jdbcDriverClass; - } - - - public String getJdbcURL() { - return jdbcURL; - } - - - public void setJdbcURL(String jdbcURL) { - this.jdbcURL = jdbcURL; - } - - - public String getJdbcUser() { - return jdbcUser; - } - - - public void setJdbcUser(String jdbcUser) { - this.jdbcUser = jdbcUser; - } - - - public String getJdbcPassword() { - return jdbcPassword; - } - - - public void setJdbcPassword(String jdbcPassword) { - this.jdbcPassword = jdbcPassword; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java new file mode 100644 index 0000000..c5c05f4 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -0,0 +1,773 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker; + +import org.apache.rocketmq.broker.client.*; +import org.apache.rocketmq.broker.client.net.Broker2Client; +import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager; +import org.apache.rocketmq.broker.filtersrv.FilterServerManager; +import org.apache.rocketmq.broker.latency.BrokerFastFailure; +import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; +import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener; +import org.apache.rocketmq.broker.longpolling.PullRequestHoldService; +import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; +import org.apache.rocketmq.broker.mqtrace.SendMessageHook; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.broker.out.BrokerOuterAPI; +import org.apache.rocketmq.broker.plugin.MessageStoreFactory; +import org.apache.rocketmq.broker.plugin.MessageStorePluginContext; +import org.apache.rocketmq.broker.processor.*; +import org.apache.rocketmq.broker.slave.SlaveSynchronize; +import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; +import org.apache.rocketmq.broker.topic.TopicConfigManager; +import org.apache.rocketmq.common.*; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.stats.MomentStatsItem; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.netty.*; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageArrivingListener; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStats; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + + +/** + * @author shijia.wxr + */ +public class BrokerController { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); + private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME); + private final BrokerConfig brokerConfig; + private final NettyServerConfig nettyServerConfig; + private final NettyClientConfig nettyClientConfig; + private final MessageStoreConfig messageStoreConfig; + private final ConsumerOffsetManager consumerOffsetManager; + private final ConsumerManager consumerManager; + private final ProducerManager producerManager; + private final ClientHousekeepingService clientHousekeepingService; + private final PullMessageProcessor pullMessageProcessor; + private final PullRequestHoldService pullRequestHoldService; + private final MessageArrivingListener messageArrivingListener; + private final Broker2Client broker2Client; + private final SubscriptionGroupManager subscriptionGroupManager; + private final ConsumerIdsChangeListener consumerIdsChangeListener; + private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager(); + private final BrokerOuterAPI brokerOuterAPI; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "BrokerControllerScheduledThread")); + private final SlaveSynchronize slaveSynchronize; + private final BlockingQueue<Runnable> sendThreadPoolQueue; + private final BlockingQueue<Runnable> pullThreadPoolQueue; + private final BlockingQueue<Runnable> clientManagerThreadPoolQueue; + private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue; + private final FilterServerManager filterServerManager; + private final BrokerStatsManager brokerStatsManager; + private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); + private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); + private MessageStore messageStore; + private RemotingServer remotingServer; + private RemotingServer fastRemotingServer; + private TopicConfigManager topicConfigManager; + private ExecutorService sendMessageExecutor; + private ExecutorService pullMessageExecutor; + private ExecutorService adminBrokerExecutor; + private ExecutorService clientManageExecutor; + private ExecutorService consumerManageExecutor; + private boolean updateMasterHAServerAddrPeriodically = false; + private BrokerStats brokerStats; + private InetSocketAddress storeHost; + private BrokerFastFailure brokerFastFailure; + private Configuration configuration; + + public BrokerController(// + final BrokerConfig brokerConfig, // + final NettyServerConfig nettyServerConfig, // + final NettyClientConfig nettyClientConfig, // + final MessageStoreConfig messageStoreConfig // + ) { + this.brokerConfig = brokerConfig; + this.nettyServerConfig = nettyServerConfig; + this.nettyClientConfig = nettyClientConfig; + this.messageStoreConfig = messageStoreConfig; + this.consumerOffsetManager = new ConsumerOffsetManager(this); + this.topicConfigManager = new TopicConfigManager(this); + this.pullMessageProcessor = new PullMessageProcessor(this); + this.pullRequestHoldService = new PullRequestHoldService(this); + this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); + this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); + this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); + this.producerManager = new ProducerManager(); + this.clientHousekeepingService = new ClientHousekeepingService(this); + this.broker2Client = new Broker2Client(this); + this.subscriptionGroupManager = new SubscriptionGroupManager(this); + this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); + this.filterServerManager = new FilterServerManager(this); + + if (this.brokerConfig.getNamesrvAddr() != null) { + this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); + log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr()); + } + + this.slaveSynchronize = new SlaveSynchronize(this); + + this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()); + + this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity()); + this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); + this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); + + this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); + this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); + + this.brokerFastFailure = new BrokerFastFailure(this); + this.configuration = new Configuration( + log, + BrokerPathConfigHelper.getBrokerConfigPath(), + this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig + ); + } + + public BrokerConfig getBrokerConfig() { + return brokerConfig; + } + + public NettyServerConfig getNettyServerConfig() { + return nettyServerConfig; + } + + public BlockingQueue<Runnable> getPullThreadPoolQueue() { + return pullThreadPoolQueue; + } + + public boolean initialize() throws CloneNotSupportedException { + boolean result = true; + + result = result && this.topicConfigManager.load(); + + result = result && this.consumerOffsetManager.load(); + result = result && this.subscriptionGroupManager.load(); + + if (result) { + try { + this.messageStore = + new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, + this.brokerConfig); + this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); + //load plugin + MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); + this.messageStore = MessageStoreFactory.build(context, this.messageStore); + } catch (IOException e) { + result = false; + e.printStackTrace(); + } + } + + result = result && this.messageStore.load(); + + if (result) { + this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); + NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); + fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); + this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); + this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getSendMessageThreadPoolNums(), + this.brokerConfig.getSendMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.sendThreadPoolQueue, + new ThreadFactoryImpl("SendMessageThread_")); + + this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getPullMessageThreadPoolNums(), + this.brokerConfig.getPullMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.pullThreadPoolQueue, + new ThreadFactoryImpl("PullMessageThread_")); + + this.adminBrokerExecutor = + Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( + "AdminBrokerThread_")); + + this.clientManageExecutor = new ThreadPoolExecutor( + this.brokerConfig.getClientManageThreadPoolNums(), + this.brokerConfig.getClientManageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.clientManagerThreadPoolQueue, + new ThreadFactoryImpl("ClientManageThread_")); + + this.consumerManageExecutor = + Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( + "ConsumerManageThread_")); + + this.registerProcessor(); + + + // TODO remove in future + final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); + final long period = 1000 * 60 * 60 * 24; + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.getBrokerStats().record(); + } catch (Throwable e) { + log.error("schedule record error.", e); + } + } + }, initialDelay, period, TimeUnit.MILLISECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.consumerOffsetManager.persist(); + } catch (Throwable e) { + log.error("schedule persist consumerOffset error.", e); + } + } + }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.protectBroker(); + } catch (Exception e) { + log.error("protectBroker error.", e); + } + } + }, 3, 3, TimeUnit.MINUTES); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.printWaterMark(); + } catch (Exception e) { + log.error("printWaterMark error.", e); + } + } + }, 10, 1, TimeUnit.SECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); + } catch (Throwable e) { + log.error("schedule dispatchBehindBytes error.", e); + } + } + }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); + + if (this.brokerConfig.getNamesrvAddr() != null) { + this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); + } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); + } catch (Throwable e) { + log.error("ScheduledTask fetchNameServerAddr exception", e); + } + } + }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); + } + + if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { + if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { + this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); + this.updateMasterHAServerAddrPeriodically = false; + } else { + this.updateMasterHAServerAddrPeriodically = true; + } + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + BrokerController.this.slaveSynchronize.syncAll(); + } catch (Throwable e) { + log.error("ScheduledTask syncAll slave exception", e); + } + } + }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); + } else { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + BrokerController.this.printMasterAndSlaveDiff(); + } catch (Throwable e) { + log.error("schedule printMasterAndSlaveDiff error.", e); + } + } + }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); + } + } + + return result; + } + + public void registerProcessor() { + /** + * SendMessageProcessor + */ + SendMessageProcessor sendProcessor = new SendMessageProcessor(this); + sendProcessor.registerSendMessageHook(sendMessageHookList); + sendProcessor.registerConsumeMessageHook(consumeMessageHookList); + + this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); + /** + * PullMessageProcessor + */ + this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); + this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); + + /** + * QueryMessageProcessor + */ + NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); + this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); + + this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); + + /** + * ClientManageProcessor + */ + ClientManageProcessor clientProcessor = new ClientManageProcessor(this); + this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); + this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); + + this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); + + /** + * ConsumerManageProcessor + */ + ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); + this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); + this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + + this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + + + /** + * EndTransactionProcessor + */ + this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); + + /** + * Default + */ + AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); + this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); + this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); + } + + public BrokerStats getBrokerStats() { + return brokerStats; + } + + public void setBrokerStats(BrokerStats brokerStats) { + this.brokerStats = brokerStats; + } + + public void protectBroker() { + if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) { + final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator(); + while (it.hasNext()) { + final Map.Entry<String, MomentStatsItem> next = it.next(); + final long fallBehindBytes = next.getValue().getValue().get(); + if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) { + final String[] split = next.getValue().getStatsKey().split("@"); + final String group = split[2]; + LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes); + this.subscriptionGroupManager.disableConsume(group); + } + } + } + } + + public long headSlowTimeMills(BlockingQueue<Runnable> q) { + long slowTimeMills = 0; + final Runnable peek = q.peek(); + if (peek != null) { + RequestTask rt = BrokerFastFailure.castRunnable(peek); + slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp(); + } + + if (slowTimeMills < 0) slowTimeMills = 0; + + return slowTimeMills; + } + + public long headSlowTimeMills4SendThreadPoolQueue() { + return this.headSlowTimeMills(this.sendThreadPoolQueue); + } + + public long headSlowTimeMills4PullThreadPoolQueue() { + return this.headSlowTimeMills(this.pullThreadPoolQueue); + } + + public void printWaterMark() { + LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); + LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); + } + + public MessageStore getMessageStore() { + return messageStore; + } + + public void setMessageStore(MessageStore messageStore) { + this.messageStore = messageStore; + } + + private void printMasterAndSlaveDiff() { + long diff = this.messageStore.slaveFallBehindMuch(); + + // XXX: warn and notify me + log.info("slave fall behind master, how much, {} bytes", diff); + } + + public Broker2Client getBroker2Client() { + return broker2Client; + } + + public ConsumerManager getConsumerManager() { + return consumerManager; + } + + public ConsumerOffsetManager getConsumerOffsetManager() { + return consumerOffsetManager; + } + + public MessageStoreConfig getMessageStoreConfig() { + return messageStoreConfig; + } + + public ProducerManager getProducerManager() { + return producerManager; + } + + public void setFastRemotingServer(RemotingServer fastRemotingServer) { + this.fastRemotingServer = fastRemotingServer; + } + + public PullMessageProcessor getPullMessageProcessor() { + return pullMessageProcessor; + } + + public PullRequestHoldService getPullRequestHoldService() { + return pullRequestHoldService; + } + + public SubscriptionGroupManager getSubscriptionGroupManager() { + return subscriptionGroupManager; + } + + public void shutdown() { + if (this.brokerStatsManager != null) { + this.brokerStatsManager.shutdown(); + } + + if (this.clientHousekeepingService != null) { + this.clientHousekeepingService.shutdown(); + } + + if (this.pullRequestHoldService != null) { + this.pullRequestHoldService.shutdown(); + } + + if (this.remotingServer != null) { + this.remotingServer.shutdown(); + } + + if (this.fastRemotingServer != null) { + this.fastRemotingServer.shutdown(); + } + + if (this.messageStore != null) { + this.messageStore.shutdown(); + } + + this.scheduledExecutorService.shutdown(); + try { + this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + + this.unregisterBrokerAll(); + + if (this.sendMessageExecutor != null) { + this.sendMessageExecutor.shutdown(); + } + + if (this.pullMessageExecutor != null) { + this.pullMessageExecutor.shutdown(); + } + + if (this.adminBrokerExecutor != null) { + this.adminBrokerExecutor.shutdown(); + } + + if (this.brokerOuterAPI != null) { + this.brokerOuterAPI.shutdown(); + } + + this.consumerOffsetManager.persist(); + + if (this.filterServerManager != null) { + this.filterServerManager.shutdown(); + } + + if (this.brokerFastFailure != null) { + this.brokerFastFailure.shutdown(); + } + } + + private void unregisterBrokerAll() { + this.brokerOuterAPI.unregisterBrokerAll( + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId()); + } + + public String getBrokerAddr() { + return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort(); + } + + public void start() throws Exception { + if (this.messageStore != null) { + this.messageStore.start(); + } + + if (this.remotingServer != null) { + this.remotingServer.start(); + } + + if (this.fastRemotingServer != null) { + this.fastRemotingServer.start(); + } + + if (this.brokerOuterAPI != null) { + this.brokerOuterAPI.start(); + } + + if (this.pullRequestHoldService != null) { + this.pullRequestHoldService.start(); + } + + if (this.clientHousekeepingService != null) { + this.clientHousekeepingService.start(); + } + + if (this.filterServerManager != null) { + this.filterServerManager.start(); + } + + this.registerBrokerAll(true, false); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + BrokerController.this.registerBrokerAll(true, false); + } catch (Throwable e) { + log.error("registerBrokerAll Exception", e); + } + } + }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS); + + if (this.brokerStatsManager != null) { + this.brokerStatsManager.start(); + } + + if (this.brokerFastFailure != null) { + this.brokerFastFailure.start(); + } + } + + public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) { + TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); + + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); + for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { + TopicConfig tmp = + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); + topicConfigTable.put(topicConfig.getTopicName(), tmp); + } + topicConfigWrapper.setTopicConfigTable(topicConfigTable); + } + + RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll( + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.getHAServerAddr(), + topicConfigWrapper, + this.filterServerManager.buildNewFilterServerList(), + oneway, + this.brokerConfig.getRegisterBrokerTimeoutMills()); + + if (registerBrokerResult != null) { + if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { + this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); + } + + this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); + + if (checkOrderConfig) { + this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); + } + } + } + + public TopicConfigManager getTopicConfigManager() { + return topicConfigManager; + } + + public void setTopicConfigManager(TopicConfigManager topicConfigManager) { + this.topicConfigManager = topicConfigManager; + } + + public String getHAServerAddr() { + return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort(); + } + + public RebalanceLockManager getRebalanceLockManager() { + return rebalanceLockManager; + } + + public SlaveSynchronize getSlaveSynchronize() { + return slaveSynchronize; + } + + public ExecutorService getPullMessageExecutor() { + return pullMessageExecutor; + } + + public void setPullMessageExecutor(ExecutorService pullMessageExecutor) { + this.pullMessageExecutor = pullMessageExecutor; + } + + public BlockingQueue<Runnable> getSendThreadPoolQueue() { + return sendThreadPoolQueue; + } + + public FilterServerManager getFilterServerManager() { + return filterServerManager; + } + + public BrokerStatsManager getBrokerStatsManager() { + return brokerStatsManager; + } + + public List<SendMessageHook> getSendMessageHookList() { + return sendMessageHookList; + } + + public void registerSendMessageHook(final SendMessageHook hook) { + this.sendMessageHookList.add(hook); + log.info("register SendMessageHook Hook, {}", hook.hookName()); + } + + public List<ConsumeMessageHook> getConsumeMessageHookList() { + return consumeMessageHookList; + } + + public void registerConsumeMessageHook(final ConsumeMessageHook hook) { + this.consumeMessageHookList.add(hook); + log.info("register ConsumeMessageHook Hook, {}", hook.hookName()); + } + + public void registerServerRPCHook(RPCHook rpcHook) { + getRemotingServer().registerRPCHook(rpcHook); + } + + public RemotingServer getRemotingServer() { + return remotingServer; + } + + public void setRemotingServer(RemotingServer remotingServer) { + this.remotingServer = remotingServer; + } + + public void registerClientRPCHook(RPCHook rpcHook) { + this.getBrokerOuterAPI().registerRPCHook(rpcHook); + } + + public BrokerOuterAPI getBrokerOuterAPI() { + return brokerOuterAPI; + } + + public InetSocketAddress getStoreHost() { + return storeHost; + } + + public void setStoreHost(InetSocketAddress storeHost) { + this.storeHost = storeHost; + } + + public Configuration getConfiguration() { + return this.configuration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java new file mode 100644 index 0000000..dbcd304 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker; + +import java.io.File; + + +public class BrokerPathConfigHelper { + private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store" + + File.separator + "config" + File.separator + "broker.properties"; + + + public static String getBrokerConfigPath() { + return brokerConfigPath; + } + + + public static void setBrokerConfigPath(String path) { + brokerConfigPath = path; + } + + + public static String getTopicConfigPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "topics.json"; + } + + + public static String getConsumerOffsetPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "consumerOffset.json"; + } + + + public static String getSubscriptionGroupPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json"; + } + +}
