http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java b/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java deleted file mode 100644 index b2b6aed..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java +++ /dev/null @@ -1,773 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker; - -import com.alibaba.rocketmq.broker.client.*; -import com.alibaba.rocketmq.broker.client.net.Broker2Client; -import com.alibaba.rocketmq.broker.client.rebalance.RebalanceLockManager; -import com.alibaba.rocketmq.broker.filtersrv.FilterServerManager; -import com.alibaba.rocketmq.broker.latency.BrokerFastFailure; -import com.alibaba.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; -import com.alibaba.rocketmq.broker.longpolling.NotifyMessageArrivingListener; -import com.alibaba.rocketmq.broker.longpolling.PullRequestHoldService; -import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook; -import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook; -import com.alibaba.rocketmq.broker.offset.ConsumerOffsetManager; -import com.alibaba.rocketmq.broker.out.BrokerOuterAPI; -import com.alibaba.rocketmq.broker.plugin.MessageStoreFactory; -import com.alibaba.rocketmq.broker.plugin.MessageStorePluginContext; -import com.alibaba.rocketmq.broker.processor.*; -import com.alibaba.rocketmq.broker.slave.SlaveSynchronize; -import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager; -import com.alibaba.rocketmq.broker.topic.TopicConfigManager; -import com.alibaba.rocketmq.common.*; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.constant.PermName; -import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import com.alibaba.rocketmq.common.stats.MomentStatsItem; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.RemotingServer; -import com.alibaba.rocketmq.remoting.netty.*; -import com.alibaba.rocketmq.store.DefaultMessageStore; -import com.alibaba.rocketmq.store.MessageArrivingListener; -import com.alibaba.rocketmq.store.MessageStore; -import com.alibaba.rocketmq.store.config.BrokerRole; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; -import com.alibaba.rocketmq.store.stats.BrokerStats; -import com.alibaba.rocketmq.store.stats.BrokerStatsManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.*; - - -/** - * @author shijia.wxr - */ -public class BrokerController { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); - private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME); - private final BrokerConfig brokerConfig; - private final NettyServerConfig nettyServerConfig; - private final NettyClientConfig nettyClientConfig; - private final MessageStoreConfig messageStoreConfig; - private final ConsumerOffsetManager consumerOffsetManager; - private final ConsumerManager consumerManager; - private final ProducerManager producerManager; - private final ClientHousekeepingService clientHousekeepingService; - private final PullMessageProcessor pullMessageProcessor; - private final PullRequestHoldService pullRequestHoldService; - private final MessageArrivingListener messageArrivingListener; - private final Broker2Client broker2Client; - private final SubscriptionGroupManager subscriptionGroupManager; - private final ConsumerIdsChangeListener consumerIdsChangeListener; - private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager(); - private final BrokerOuterAPI brokerOuterAPI; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "BrokerControllerScheduledThread")); - private final SlaveSynchronize slaveSynchronize; - private final BlockingQueue<Runnable> sendThreadPoolQueue; - private final BlockingQueue<Runnable> pullThreadPoolQueue; - private final BlockingQueue<Runnable> clientManagerThreadPoolQueue; - private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue; - private final FilterServerManager filterServerManager; - private final BrokerStatsManager brokerStatsManager; - private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); - private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); - private MessageStore messageStore; - private RemotingServer remotingServer; - private RemotingServer fastRemotingServer; - private TopicConfigManager topicConfigManager; - private ExecutorService sendMessageExecutor; - private ExecutorService pullMessageExecutor; - private ExecutorService adminBrokerExecutor; - private ExecutorService clientManageExecutor; - private ExecutorService consumerManageExecutor; - private boolean updateMasterHAServerAddrPeriodically = false; - private BrokerStats brokerStats; - private InetSocketAddress storeHost; - private BrokerFastFailure brokerFastFailure; - private Configuration configuration; - - public BrokerController(// - final BrokerConfig brokerConfig, // - final NettyServerConfig nettyServerConfig, // - final NettyClientConfig nettyClientConfig, // - final MessageStoreConfig messageStoreConfig // - ) { - this.brokerConfig = brokerConfig; - this.nettyServerConfig = nettyServerConfig; - this.nettyClientConfig = nettyClientConfig; - this.messageStoreConfig = messageStoreConfig; - this.consumerOffsetManager = new ConsumerOffsetManager(this); - this.topicConfigManager = new TopicConfigManager(this); - this.pullMessageProcessor = new PullMessageProcessor(this); - this.pullRequestHoldService = new PullRequestHoldService(this); - this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); - this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); - this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); - this.producerManager = new ProducerManager(); - this.clientHousekeepingService = new ClientHousekeepingService(this); - this.broker2Client = new Broker2Client(this); - this.subscriptionGroupManager = new SubscriptionGroupManager(this); - this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); - this.filterServerManager = new FilterServerManager(this); - - if (this.brokerConfig.getNamesrvAddr() != null) { - this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); - log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr()); - } - - this.slaveSynchronize = new SlaveSynchronize(this); - - this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()); - - this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity()); - this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); - this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); - - this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); - this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); - - this.brokerFastFailure = new BrokerFastFailure(this); - this.configuration = new Configuration( - log, - BrokerPathConfigHelper.getBrokerConfigPath(), - this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig - ); - } - - public BrokerConfig getBrokerConfig() { - return brokerConfig; - } - - public NettyServerConfig getNettyServerConfig() { - return nettyServerConfig; - } - - public BlockingQueue<Runnable> getPullThreadPoolQueue() { - return pullThreadPoolQueue; - } - - public boolean initialize() throws CloneNotSupportedException { - boolean result = true; - - result = result && this.topicConfigManager.load(); - - result = result && this.consumerOffsetManager.load(); - result = result && this.subscriptionGroupManager.load(); - - if (result) { - try { - this.messageStore = - new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, - this.brokerConfig); - this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); - //load plugin - MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); - this.messageStore = MessageStoreFactory.build(context, this.messageStore); - } catch (IOException e) { - result = false; - e.printStackTrace(); - } - } - - result = result && this.messageStore.load(); - - if (result) { - this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); - NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); - fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); - this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); - this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getSendMessageThreadPoolNums(), - this.brokerConfig.getSendMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.sendThreadPoolQueue, - new ThreadFactoryImpl("SendMessageThread_")); - - this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getPullMessageThreadPoolNums(), - this.brokerConfig.getPullMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.pullThreadPoolQueue, - new ThreadFactoryImpl("PullMessageThread_")); - - this.adminBrokerExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( - "AdminBrokerThread_")); - - this.clientManageExecutor = new ThreadPoolExecutor( - this.brokerConfig.getClientManageThreadPoolNums(), - this.brokerConfig.getClientManageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.clientManagerThreadPoolQueue, - new ThreadFactoryImpl("ClientManageThread_")); - - this.consumerManageExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( - "ConsumerManageThread_")); - - this.registerProcessor(); - - - // TODO remove in future - final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); - final long period = 1000 * 60 * 60 * 24; - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - BrokerController.this.getBrokerStats().record(); - } catch (Throwable e) { - log.error("schedule record error.", e); - } - } - }, initialDelay, period, TimeUnit.MILLISECONDS); - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - BrokerController.this.consumerOffsetManager.persist(); - } catch (Throwable e) { - log.error("schedule persist consumerOffset error.", e); - } - } - }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - BrokerController.this.protectBroker(); - } catch (Exception e) { - log.error("protectBroker error.", e); - } - } - }, 3, 3, TimeUnit.MINUTES); - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - BrokerController.this.printWaterMark(); - } catch (Exception e) { - log.error("printWaterMark error.", e); - } - } - }, 10, 1, TimeUnit.SECONDS); - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); - } catch (Throwable e) { - log.error("schedule dispatchBehindBytes error.", e); - } - } - }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); - - if (this.brokerConfig.getNamesrvAddr() != null) { - this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); - } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); - } catch (Throwable e) { - log.error("ScheduledTask fetchNameServerAddr exception", e); - } - } - }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); - } - - if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { - if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { - this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); - this.updateMasterHAServerAddrPeriodically = false; - } else { - this.updateMasterHAServerAddrPeriodically = true; - } - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - BrokerController.this.slaveSynchronize.syncAll(); - } catch (Throwable e) { - log.error("ScheduledTask syncAll slave exception", e); - } - } - }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); - } else { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - BrokerController.this.printMasterAndSlaveDiff(); - } catch (Throwable e) { - log.error("schedule printMasterAndSlaveDiff error.", e); - } - } - }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); - } - } - - return result; - } - - public void registerProcessor() { - /** - * SendMessageProcessor - */ - SendMessageProcessor sendProcessor = new SendMessageProcessor(this); - sendProcessor.registerSendMessageHook(sendMessageHookList); - sendProcessor.registerConsumeMessageHook(consumeMessageHookList); - - this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); - /** - * PullMessageProcessor - */ - this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); - this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); - - /** - * QueryMessageProcessor - */ - NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); - this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); - this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); - - this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); - - /** - * ClientManageProcessor - */ - ClientManageProcessor clientProcessor = new ClientManageProcessor(this); - this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); - this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); - - this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); - - /** - * ConsumerManageProcessor - */ - ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); - this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); - this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - - this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - - - /** - * EndTransactionProcessor - */ - this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); - this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); - - /** - * Default - */ - AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); - this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); - this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); - } - - public BrokerStats getBrokerStats() { - return brokerStats; - } - - public void setBrokerStats(BrokerStats brokerStats) { - this.brokerStats = brokerStats; - } - - public void protectBroker() { - if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) { - final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator(); - while (it.hasNext()) { - final Map.Entry<String, MomentStatsItem> next = it.next(); - final long fallBehindBytes = next.getValue().getValue().get(); - if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) { - final String[] split = next.getValue().getStatsKey().split("@"); - final String group = split[2]; - LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes); - this.subscriptionGroupManager.disableConsume(group); - } - } - } - } - - public long headSlowTimeMills(BlockingQueue<Runnable> q) { - long slowTimeMills = 0; - final Runnable peek = q.peek(); - if (peek != null) { - RequestTask rt = BrokerFastFailure.castRunnable(peek); - slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp(); - } - - if (slowTimeMills < 0) slowTimeMills = 0; - - return slowTimeMills; - } - - public long headSlowTimeMills4SendThreadPoolQueue() { - return this.headSlowTimeMills(this.sendThreadPoolQueue); - } - - public long headSlowTimeMills4PullThreadPoolQueue() { - return this.headSlowTimeMills(this.pullThreadPoolQueue); - } - - public void printWaterMark() { - LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); - LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); - } - - public MessageStore getMessageStore() { - return messageStore; - } - - public void setMessageStore(MessageStore messageStore) { - this.messageStore = messageStore; - } - - private void printMasterAndSlaveDiff() { - long diff = this.messageStore.slaveFallBehindMuch(); - - // XXX: warn and notify me - log.info("slave fall behind master, how much, {} bytes", diff); - } - - public Broker2Client getBroker2Client() { - return broker2Client; - } - - public ConsumerManager getConsumerManager() { - return consumerManager; - } - - public ConsumerOffsetManager getConsumerOffsetManager() { - return consumerOffsetManager; - } - - public MessageStoreConfig getMessageStoreConfig() { - return messageStoreConfig; - } - - public ProducerManager getProducerManager() { - return producerManager; - } - - public void setFastRemotingServer(RemotingServer fastRemotingServer) { - this.fastRemotingServer = fastRemotingServer; - } - - public PullMessageProcessor getPullMessageProcessor() { - return pullMessageProcessor; - } - - public PullRequestHoldService getPullRequestHoldService() { - return pullRequestHoldService; - } - - public SubscriptionGroupManager getSubscriptionGroupManager() { - return subscriptionGroupManager; - } - - public void shutdown() { - if (this.brokerStatsManager != null) { - this.brokerStatsManager.shutdown(); - } - - if (this.clientHousekeepingService != null) { - this.clientHousekeepingService.shutdown(); - } - - if (this.pullRequestHoldService != null) { - this.pullRequestHoldService.shutdown(); - } - - if (this.remotingServer != null) { - this.remotingServer.shutdown(); - } - - if (this.fastRemotingServer != null) { - this.fastRemotingServer.shutdown(); - } - - if (this.messageStore != null) { - this.messageStore.shutdown(); - } - - this.scheduledExecutorService.shutdown(); - try { - this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - } - - this.unregisterBrokerAll(); - - if (this.sendMessageExecutor != null) { - this.sendMessageExecutor.shutdown(); - } - - if (this.pullMessageExecutor != null) { - this.pullMessageExecutor.shutdown(); - } - - if (this.adminBrokerExecutor != null) { - this.adminBrokerExecutor.shutdown(); - } - - if (this.brokerOuterAPI != null) { - this.brokerOuterAPI.shutdown(); - } - - this.consumerOffsetManager.persist(); - - if (this.filterServerManager != null) { - this.filterServerManager.shutdown(); - } - - if (this.brokerFastFailure != null) { - this.brokerFastFailure.shutdown(); - } - } - - private void unregisterBrokerAll() { - this.brokerOuterAPI.unregisterBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId()); - } - - public String getBrokerAddr() { - return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort(); - } - - public void start() throws Exception { - if (this.messageStore != null) { - this.messageStore.start(); - } - - if (this.remotingServer != null) { - this.remotingServer.start(); - } - - if (this.fastRemotingServer != null) { - this.fastRemotingServer.start(); - } - - if (this.brokerOuterAPI != null) { - this.brokerOuterAPI.start(); - } - - if (this.pullRequestHoldService != null) { - this.pullRequestHoldService.start(); - } - - if (this.clientHousekeepingService != null) { - this.clientHousekeepingService.start(); - } - - if (this.filterServerManager != null) { - this.filterServerManager.start(); - } - - this.registerBrokerAll(true, false); - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - try { - BrokerController.this.registerBrokerAll(true, false); - } catch (Throwable e) { - log.error("registerBrokerAll Exception", e); - } - } - }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS); - - if (this.brokerStatsManager != null) { - this.brokerStatsManager.start(); - } - - if (this.brokerFastFailure != null) { - this.brokerFastFailure.start(); - } - } - - public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) { - TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); - - if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) - || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { - ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); - for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { - TopicConfig tmp = - new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), - this.brokerConfig.getBrokerPermission()); - topicConfigTable.put(topicConfig.getTopicName(), tmp); - } - topicConfigWrapper.setTopicConfigTable(topicConfigTable); - } - - RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), - this.getHAServerAddr(), - topicConfigWrapper, - this.filterServerManager.buildNewFilterServerList(), - oneway, - this.brokerConfig.getRegisterBrokerTimeoutMills()); - - if (registerBrokerResult != null) { - if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { - this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); - } - - this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); - - if (checkOrderConfig) { - this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); - } - } - } - - public TopicConfigManager getTopicConfigManager() { - return topicConfigManager; - } - - public void setTopicConfigManager(TopicConfigManager topicConfigManager) { - this.topicConfigManager = topicConfigManager; - } - - public String getHAServerAddr() { - return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort(); - } - - public RebalanceLockManager getRebalanceLockManager() { - return rebalanceLockManager; - } - - public SlaveSynchronize getSlaveSynchronize() { - return slaveSynchronize; - } - - public ExecutorService getPullMessageExecutor() { - return pullMessageExecutor; - } - - public void setPullMessageExecutor(ExecutorService pullMessageExecutor) { - this.pullMessageExecutor = pullMessageExecutor; - } - - public BlockingQueue<Runnable> getSendThreadPoolQueue() { - return sendThreadPoolQueue; - } - - public FilterServerManager getFilterServerManager() { - return filterServerManager; - } - - public BrokerStatsManager getBrokerStatsManager() { - return brokerStatsManager; - } - - public List<SendMessageHook> getSendMessageHookList() { - return sendMessageHookList; - } - - public void registerSendMessageHook(final SendMessageHook hook) { - this.sendMessageHookList.add(hook); - log.info("register SendMessageHook Hook, {}", hook.hookName()); - } - - public List<ConsumeMessageHook> getConsumeMessageHookList() { - return consumeMessageHookList; - } - - public void registerConsumeMessageHook(final ConsumeMessageHook hook) { - this.consumeMessageHookList.add(hook); - log.info("register ConsumeMessageHook Hook, {}", hook.hookName()); - } - - public void registerServerRPCHook(RPCHook rpcHook) { - getRemotingServer().registerRPCHook(rpcHook); - } - - public RemotingServer getRemotingServer() { - return remotingServer; - } - - public void setRemotingServer(RemotingServer remotingServer) { - this.remotingServer = remotingServer; - } - - public void registerClientRPCHook(RPCHook rpcHook) { - this.getBrokerOuterAPI().registerRPCHook(rpcHook); - } - - public BrokerOuterAPI getBrokerOuterAPI() { - return brokerOuterAPI; - } - - public InetSocketAddress getStoreHost() { - return storeHost; - } - - public void setStoreHost(InetSocketAddress storeHost) { - this.storeHost = storeHost; - } - - public Configuration getConfiguration() { - return this.configuration; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java deleted file mode 100644 index 055e8dc..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker; - -import java.io.File; - - -public class BrokerPathConfigHelper { - private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store" - + File.separator + "config" + File.separator + "broker.properties"; - - - public static String getBrokerConfigPath() { - return brokerConfigPath; - } - - - public static void setBrokerConfigPath(String path) { - brokerConfigPath = path; - } - - - public static String getTopicConfigPath(final String rootDir) { - return rootDir + File.separator + "config" + File.separator + "topics.json"; - } - - - public static String getConsumerOffsetPath(final String rootDir) { - return rootDir + File.separator + "config" + File.separator + "consumerOffset.json"; - } - - - public static String getSubscriptionGroupPath(final String rootDir) { - return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json"; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java deleted file mode 100644 index 7e81117..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker; - -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.joran.JoranConfigurator; -import com.alibaba.rocketmq.common.BrokerConfig; -import com.alibaba.rocketmq.common.MQVersion; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.remoting.common.RemotingUtil; -import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; -import com.alibaba.rocketmq.remoting.netty.NettySystemConfig; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.srvutil.ServerUtil; -import com.alibaba.rocketmq.store.config.BrokerRole; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - - -/** - * @author shijia.wxr - */ -public class BrokerStartup { - public static Properties properties = null; - public static CommandLine commandLine = null; - public static String configFile = null; - public static Logger log; - - public static void main(String[] args) { - start(createBrokerController(args)); - } - - public static BrokerController start(BrokerController controller) { - try { - controller.start(); - String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", " - + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); - - if (null != controller.getBrokerConfig().getNamesrvAddr()) { - tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr(); - } - - log.info(tip); - return controller; - } catch (Throwable e) { - e.printStackTrace(); - System.exit(-1); - } - - return null; - } - - public static BrokerController createBrokerController(String[] args) { - System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { - NettySystemConfig.socketSndbufSize = 131072; - } - - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { - NettySystemConfig.socketRcvbufSize = 131072; - } - - try { - //PackageConflictDetect.detectFastjson(); - Options options = ServerUtil.buildCommandlineOptions(new Options()); - commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), - new PosixParser()); - if (null == commandLine) { - System.exit(-1); - } - - final BrokerConfig brokerConfig = new BrokerConfig(); - final NettyServerConfig nettyServerConfig = new NettyServerConfig(); - final NettyClientConfig nettyClientConfig = new NettyClientConfig(); - nettyServerConfig.setListenPort(10911); - final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - - if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { - int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; - messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); - } - - if (commandLine.hasOption('p')) { - MixAll.printObjectProperties(null, brokerConfig); - MixAll.printObjectProperties(null, nettyServerConfig); - MixAll.printObjectProperties(null, nettyClientConfig); - MixAll.printObjectProperties(null, messageStoreConfig); - System.exit(0); - } else if (commandLine.hasOption('m')) { - MixAll.printObjectProperties(null, brokerConfig, true); - MixAll.printObjectProperties(null, nettyServerConfig, true); - MixAll.printObjectProperties(null, nettyClientConfig, true); - MixAll.printObjectProperties(null, messageStoreConfig, true); - System.exit(0); - } - - if (commandLine.hasOption('c')) { - String file = commandLine.getOptionValue('c'); - if (file != null) { - configFile = file; - InputStream in = new BufferedInputStream(new FileInputStream(file)); - properties = new Properties(); - properties.load(in); - - parsePropertie2SystemEnv(properties); - MixAll.properties2Object(properties, brokerConfig); - MixAll.properties2Object(properties, nettyServerConfig); - MixAll.properties2Object(properties, nettyClientConfig); - MixAll.properties2Object(properties, messageStoreConfig); - - BrokerPathConfigHelper.setBrokerConfigPath(file); - in.close(); - } - } - - MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); - - if (null == brokerConfig.getRocketmqHome()) { - System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV - + " variable in your environment to match the location of the RocketMQ installation"); - System.exit(-2); - } - - String namesrvAddr = brokerConfig.getNamesrvAddr(); - if (null != namesrvAddr) { - try { - String[] addrArray = namesrvAddr.split(";"); - if (addrArray != null) { - for (String addr : addrArray) { - RemotingUtil.string2SocketAddress(addr); - } - } - } catch (Exception e) { - System.out.printf( - "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", - namesrvAddr); - System.exit(-3); - } - } - - - switch (messageStoreConfig.getBrokerRole()) { - case ASYNC_MASTER: - case SYNC_MASTER: - brokerConfig.setBrokerId(MixAll.MASTER_ID); - break; - case SLAVE: - if (brokerConfig.getBrokerId() <= 0) { - System.out.printf("Slave's brokerId must be > 0"); - System.exit(-3); - } - - break; - default: - break; - } - - messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); - LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); - JoranConfigurator configurator = new JoranConfigurator(); - configurator.setContext(lc); - lc.reset(); - configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); - log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - - MixAll.printObjectProperties(log, brokerConfig); - MixAll.printObjectProperties(log, nettyServerConfig); - MixAll.printObjectProperties(log, nettyClientConfig); - MixAll.printObjectProperties(log, messageStoreConfig); - - final BrokerController controller = new BrokerController(// - brokerConfig, // - nettyServerConfig, // - nettyClientConfig, // - messageStoreConfig); - // remember all configs to prevent discard - controller.getConfiguration().registerConfig(properties); - - boolean initResult = controller.initialize(); - if (!initResult) { - controller.shutdown(); - System.exit(-3); - } - - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - private volatile boolean hasShutdown = false; - private AtomicInteger shutdownTimes = new AtomicInteger(0); - - @Override - public void run() { - synchronized (this) { - log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); - if (!this.hasShutdown) { - this.hasShutdown = true; - long begineTime = System.currentTimeMillis(); - controller.shutdown(); - long consumingTimeTotal = System.currentTimeMillis() - begineTime; - log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); - } - } - } - }, "ShutdownHook")); - - return controller; - } catch (Throwable e) { - e.printStackTrace(); - System.exit(-1); - } - - return null; - } - - private static void parsePropertie2SystemEnv(Properties properties) { - if (properties == null) { - return; - } - String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", "jmenv.tbsite.net"); - String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", "nsaddr"); - System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain); - System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup); - } - - public static Options buildCommandlineOptions(final Options options) { - Option opt = new Option("c", "configFile", true, "Broker config properties file"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("p", "printConfigItem", false, "Print all config item"); - opt.setRequired(false); - options.addOption(opt); - - opt = new Option("m", "printImportantConfig", false, "Print important config item"); - opt.setRequired(false); - options.addOption(opt); - - return options; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java deleted file mode 100644 index babf4b7..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.client; - -import com.alibaba.rocketmq.remoting.protocol.LanguageCode; -import io.netty.channel.Channel; - - -/** - * @author shijia.wxr - */ -public class ClientChannelInfo { - private final Channel channel; - private final String clientId; - private final LanguageCode language; - private final int version; - private volatile long lastUpdateTimestamp = System.currentTimeMillis(); - - - public ClientChannelInfo(Channel channel) { - this(channel, null, null, 0); - } - - - public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) { - this.channel = channel; - this.clientId = clientId; - this.language = language; - this.version = version; - } - - - public Channel getChannel() { - return channel; - } - - - public String getClientId() { - return clientId; - } - - - public LanguageCode getLanguage() { - return language; - } - - - public int getVersion() { - return version; - } - - - public long getLastUpdateTimestamp() { - return lastUpdateTimestamp; - } - - - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { - this.lastUpdateTimestamp = lastUpdateTimestamp; - } - - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((channel == null) ? 0 : channel.hashCode()); - result = prime * result + ((clientId == null) ? 0 : clientId.hashCode()); - result = prime * result + ((language == null) ? 0 : language.hashCode()); - result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32)); - result = prime * result + version; - return result; - } - - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - ClientChannelInfo other = (ClientChannelInfo) obj; - if (channel == null) { - if (other.channel != null) - return false; - } else if (this.channel != other.channel) { - return false; - } - - return true; - } - - - @Override - public String toString() { - return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language - + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java deleted file mode 100644 index 4ac7532..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.client; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.remoting.ChannelEventListener; -import io.netty.channel.Channel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - -/** - * @author shijia.wxr - */ -public class ClientHousekeepingService implements ChannelEventListener { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final BrokerController brokerController; - - private ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); - - - public ClientHousekeepingService(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - - public void start() { - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - ClientHousekeepingService.this.scanExceptionChannel(); - } catch (Exception e) { - log.error("", e); - } - } - }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); - } - - private void scanExceptionChannel() { - this.brokerController.getProducerManager().scanNotActiveChannel(); - this.brokerController.getConsumerManager().scanNotActiveChannel(); - this.brokerController.getFilterServerManager().scanNotActiveChannel(); - } - - public void shutdown() { - this.scheduledExecutorService.shutdown(); - } - - @Override - public void onChannelConnect(String remoteAddr, Channel channel) { - - } - - - @Override - public void onChannelClose(String remoteAddr, Channel channel) { - this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); - } - - - @Override - public void onChannelException(String remoteAddr, Channel channel) { - this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); - } - - - @Override - public void onChannelIdle(String remoteAddr, Channel channel) { - this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java deleted file mode 100644 index 410b703..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java +++ /dev/null @@ -1,273 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.client; - -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import io.netty.channel.Channel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - */ -public class ConsumerGroupInfo { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final String groupName; - private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = - new ConcurrentHashMap<String, SubscriptionData>(); - private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = - new ConcurrentHashMap<Channel, ClientChannelInfo>(16); - private volatile ConsumeType consumeType; - private volatile MessageModel messageModel; - private volatile ConsumeFromWhere consumeFromWhere; - private volatile long lastUpdateTimestamp = System.currentTimeMillis(); - - - public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel, - ConsumeFromWhere consumeFromWhere) { - this.groupName = groupName; - this.consumeType = consumeType; - this.messageModel = messageModel; - this.consumeFromWhere = consumeFromWhere; - } - - - public ClientChannelInfo findChannel(final String clientId) { - Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<Channel, ClientChannelInfo> next = it.next(); - if (next.getValue().getClientId().equals(clientId)) { - return next.getValue(); - } - } - - return null; - } - - - public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { - return subscriptionTable; - } - - - public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() { - return channelInfoTable; - } - - - public List<Channel> getAllChannel() { - List<Channel> result = new ArrayList<Channel>(); - - result.addAll(this.channelInfoTable.keySet()); - - return result; - } - - - public List<String> getAllClientId() { - List<String> result = new ArrayList<String>(); - - Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator(); - - while (it.hasNext()) { - Entry<Channel, ClientChannelInfo> entry = it.next(); - ClientChannelInfo clientChannelInfo = entry.getValue(); - result.add(clientChannelInfo.getClientId()); - } - - return result; - } - - - public void unregisterChannel(final ClientChannelInfo clientChannelInfo) { - ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel()); - if (old != null) { - log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString()); - } - } - - - public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { - final ClientChannelInfo info = this.channelInfoTable.remove(channel); - if (info != null) { - log.warn( - "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}", - info.toString(), groupName); - return true; - } - - return false; - } - - public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType, - MessageModel messageModel, ConsumeFromWhere consumeFromWhere) { - boolean updated = false; - this.consumeType = consumeType; - this.messageModel = messageModel; - this.consumeFromWhere = consumeFromWhere; - - ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel()); - if (null == infoOld) { - ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew); - if (null == prev) { - log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType, - messageModel, infoNew.toString()); - updated = true; - } - - infoOld = infoNew; - } else { - if (!infoOld.getClientId().equals(infoNew.getClientId())) { - log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", - this.groupName, - infoOld.toString(), - infoNew.toString()); - this.channelInfoTable.put(infoNew.getChannel(), infoNew); - } - } - - this.lastUpdateTimestamp = System.currentTimeMillis(); - infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp); - - return updated; - } - - - public boolean updateSubscription(final Set<SubscriptionData> subList) { - boolean updated = false; - - for (SubscriptionData sub : subList) { - SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); - if (old == null) { - SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub); - if (null == prev) { - updated = true; - log.info("subscription changed, add new topic, group: {} {}", - this.groupName, - sub.toString()); - } - } else if (sub.getSubVersion() > old.getSubVersion()) { - if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) { - log.info("subscription changed, group: {} OLD: {} NEW: {}", - this.groupName, - old.toString(), - sub.toString() - ); - } - - this.subscriptionTable.put(sub.getTopic(), sub); - } - } - - - Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, SubscriptionData> next = it.next(); - String oldTopic = next.getKey(); - - boolean exist = false; - for (SubscriptionData sub : subList) { - if (sub.getTopic().equals(oldTopic)) { - exist = true; - break; - } - } - - if (!exist) { - log.warn("subscription changed, group: {} remove topic {} {}", - this.groupName, - oldTopic, - next.getValue().toString() - ); - - it.remove(); - updated = true; - } - } - - this.lastUpdateTimestamp = System.currentTimeMillis(); - - return updated; - } - - - public Set<String> getSubscribeTopics() { - return subscriptionTable.keySet(); - } - - - public SubscriptionData findSubscriptionData(final String topic) { - return this.subscriptionTable.get(topic); - } - - - public ConsumeType getConsumeType() { - return consumeType; - } - - - public void setConsumeType(ConsumeType consumeType) { - this.consumeType = consumeType; - } - - - public MessageModel getMessageModel() { - return messageModel; - } - - - public void setMessageModel(MessageModel messageModel) { - this.messageModel = messageModel; - } - - - public String getGroupName() { - return groupName; - } - - - public long getLastUpdateTimestamp() { - return lastUpdateTimestamp; - } - - - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { - this.lastUpdateTimestamp = lastUpdateTimestamp; - } - - - public ConsumeFromWhere getConsumeFromWhere() { - return consumeFromWhere; - } - - - public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { - this.consumeFromWhere = consumeFromWhere; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java deleted file mode 100644 index 4da2eb3..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.client; - -import io.netty.channel.Channel; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public interface ConsumerIdsChangeListener { - public void consumerIdsChanged(final String group, final List<Channel> channels); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java deleted file mode 100644 index 48e9673..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.client; - -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.common.RemotingUtil; -import io.netty.channel.Channel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - */ -public class ConsumerManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; - private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable = - new ConcurrentHashMap<String, ConsumerGroupInfo>(1024); - private final ConsumerIdsChangeListener consumerIdsChangeListener; - - public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) { - this.consumerIdsChangeListener = consumerIdsChangeListener; - } - - public ClientChannelInfo findChannel(final String group, final String clientId) { - ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); - if (consumerGroupInfo != null) { - return consumerGroupInfo.findChannel(clientId); - } - return null; - } - - public SubscriptionData findSubscriptionData(final String group, final String topic) { - ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group); - if (consumerGroupInfo != null) { - return consumerGroupInfo.findSubscriptionData(topic); - } - - return null; - } - - public ConsumerGroupInfo getConsumerGroupInfo(final String group) { - return this.consumerTable.get(group); - } - - public int findSubscriptionDataCount(final String group) { - ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group); - if (consumerGroupInfo != null) { - return consumerGroupInfo.getSubscriptionTable().size(); - } - - return 0; - } - - public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { - Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, ConsumerGroupInfo> next = it.next(); - ConsumerGroupInfo info = next.getValue(); - boolean removed = info.doChannelCloseEvent(remoteAddr, channel); - if (removed) { - if (info.getChannelInfoTable().isEmpty()) { - ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey()); - if (remove != null) { - log.info("unregister consumer ok, no any connection, and remove consumer group, {}", - next.getKey()); - } - } - - this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel()); - } - } - } - - public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, - ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, - final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { - - ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); - if (null == consumerGroupInfo) { - ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); - ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); - consumerGroupInfo = prev != null ? prev : tmp; - } - - boolean r1 = - consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, - consumeFromWhere); - boolean r2 = consumerGroupInfo.updateSubscription(subList); - - if (r1 || r2) { - if (isNotifyConsumerIdsChangedEnable) { - this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel()); - } - } - - return r1 || r2; - } - - public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { - ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); - if (null != consumerGroupInfo) { - consumerGroupInfo.unregisterChannel(clientChannelInfo); - if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { - ConsumerGroupInfo remove = this.consumerTable.remove(group); - if (remove != null) { - log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); - } - } - if (isNotifyConsumerIdsChangedEnable) { - this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel()); - } - } - } - - public void scanNotActiveChannel() { - Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, ConsumerGroupInfo> next = it.next(); - String group = next.getKey(); - ConsumerGroupInfo consumerGroupInfo = next.getValue(); - ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = - consumerGroupInfo.getChannelInfoTable(); - - Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator(); - while (itChannel.hasNext()) { - Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next(); - ClientChannelInfo clientChannelInfo = nextChannel.getValue(); - long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp(); - if (diff > CHANNEL_EXPIRED_TIMEOUT) { - log.warn( - "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}", - RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group); - RemotingUtil.closeChannel(clientChannelInfo.getChannel()); - itChannel.remove(); - } - } - - if (channelInfoTable.isEmpty()) { - log.warn( - "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}", - group); - it.remove(); - } - } - } - - public HashSet<String> queryTopicConsumeByWho(final String topic) { - HashSet<String> groups = new HashSet<String>(); - Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, ConsumerGroupInfo> entry = it.next(); - ConcurrentHashMap<String, SubscriptionData> subscriptionTable = - entry.getValue().getSubscriptionTable(); - if (subscriptionTable.containsKey(topic)) { - groups.add(entry.getKey()); - } - } - return groups; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java deleted file mode 100644 index 0095913..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.client; - -import com.alibaba.rocketmq.broker.BrokerController; -import io.netty.channel.Channel; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener { - private final BrokerController brokerController; - - - public DefaultConsumerIdsChangeListener(BrokerController brokerController) { - this.brokerController = brokerController; - } - - - @Override - public void consumerIdsChanged(String group, List<Channel> channels) { - if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { - for (Channel chl : channels) { - this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); - } - } - } -}
