http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java new file mode 100644 index 0000000..b2b6aed --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java @@ -0,0 +1,773 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker; + +import com.alibaba.rocketmq.broker.client.*; +import com.alibaba.rocketmq.broker.client.net.Broker2Client; +import com.alibaba.rocketmq.broker.client.rebalance.RebalanceLockManager; +import com.alibaba.rocketmq.broker.filtersrv.FilterServerManager; +import com.alibaba.rocketmq.broker.latency.BrokerFastFailure; +import com.alibaba.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; +import com.alibaba.rocketmq.broker.longpolling.NotifyMessageArrivingListener; +import com.alibaba.rocketmq.broker.longpolling.PullRequestHoldService; +import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook; +import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook; +import com.alibaba.rocketmq.broker.offset.ConsumerOffsetManager; +import com.alibaba.rocketmq.broker.out.BrokerOuterAPI; +import com.alibaba.rocketmq.broker.plugin.MessageStoreFactory; +import com.alibaba.rocketmq.broker.plugin.MessageStorePluginContext; +import com.alibaba.rocketmq.broker.processor.*; +import com.alibaba.rocketmq.broker.slave.SlaveSynchronize; +import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager; +import com.alibaba.rocketmq.broker.topic.TopicConfigManager; +import com.alibaba.rocketmq.common.*; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.constant.PermName; +import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult; +import com.alibaba.rocketmq.common.protocol.RequestCode; +import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import com.alibaba.rocketmq.common.stats.MomentStatsItem; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.RemotingServer; +import com.alibaba.rocketmq.remoting.netty.*; +import com.alibaba.rocketmq.store.DefaultMessageStore; +import com.alibaba.rocketmq.store.MessageArrivingListener; +import com.alibaba.rocketmq.store.MessageStore; +import com.alibaba.rocketmq.store.config.BrokerRole; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import com.alibaba.rocketmq.store.stats.BrokerStats; +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + + +/** + * @author shijia.wxr + */ +public class BrokerController { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); + private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME); + private final BrokerConfig brokerConfig; + private final NettyServerConfig nettyServerConfig; + private final NettyClientConfig nettyClientConfig; + private final MessageStoreConfig messageStoreConfig; + private final ConsumerOffsetManager consumerOffsetManager; + private final ConsumerManager consumerManager; + private final ProducerManager producerManager; + private final ClientHousekeepingService clientHousekeepingService; + private final PullMessageProcessor pullMessageProcessor; + private final PullRequestHoldService pullRequestHoldService; + private final MessageArrivingListener messageArrivingListener; + private final Broker2Client broker2Client; + private final SubscriptionGroupManager subscriptionGroupManager; + private final ConsumerIdsChangeListener consumerIdsChangeListener; + private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager(); + private final BrokerOuterAPI brokerOuterAPI; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "BrokerControllerScheduledThread")); + private final SlaveSynchronize slaveSynchronize; + private final BlockingQueue<Runnable> sendThreadPoolQueue; + private final BlockingQueue<Runnable> pullThreadPoolQueue; + private final BlockingQueue<Runnable> clientManagerThreadPoolQueue; + private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue; + private final FilterServerManager filterServerManager; + private final BrokerStatsManager brokerStatsManager; + private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); + private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); + private MessageStore messageStore; + private RemotingServer remotingServer; + private RemotingServer fastRemotingServer; + private TopicConfigManager topicConfigManager; + private ExecutorService sendMessageExecutor; + private ExecutorService pullMessageExecutor; + private ExecutorService adminBrokerExecutor; + private ExecutorService clientManageExecutor; + private ExecutorService consumerManageExecutor; + private boolean updateMasterHAServerAddrPeriodically = false; + private BrokerStats brokerStats; + private InetSocketAddress storeHost; + private BrokerFastFailure brokerFastFailure; + private Configuration configuration; + + public BrokerController(// + final BrokerConfig brokerConfig, // + final NettyServerConfig nettyServerConfig, // + final NettyClientConfig nettyClientConfig, // + final MessageStoreConfig messageStoreConfig // + ) { + this.brokerConfig = brokerConfig; + this.nettyServerConfig = nettyServerConfig; + this.nettyClientConfig = nettyClientConfig; + this.messageStoreConfig = messageStoreConfig; + this.consumerOffsetManager = new ConsumerOffsetManager(this); + this.topicConfigManager = new TopicConfigManager(this); + this.pullMessageProcessor = new PullMessageProcessor(this); + this.pullRequestHoldService = new PullRequestHoldService(this); + this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); + this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); + this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); + this.producerManager = new ProducerManager(); + this.clientHousekeepingService = new ClientHousekeepingService(this); + this.broker2Client = new Broker2Client(this); + this.subscriptionGroupManager = new SubscriptionGroupManager(this); + this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); + this.filterServerManager = new FilterServerManager(this); + + if (this.brokerConfig.getNamesrvAddr() != null) { + this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); + log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr()); + } + + this.slaveSynchronize = new SlaveSynchronize(this); + + this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()); + + this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity()); + this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); + this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); + + this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); + this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); + + this.brokerFastFailure = new BrokerFastFailure(this); + this.configuration = new Configuration( + log, + BrokerPathConfigHelper.getBrokerConfigPath(), + this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig + ); + } + + public BrokerConfig getBrokerConfig() { + return brokerConfig; + } + + public NettyServerConfig getNettyServerConfig() { + return nettyServerConfig; + } + + public BlockingQueue<Runnable> getPullThreadPoolQueue() { + return pullThreadPoolQueue; + } + + public boolean initialize() throws CloneNotSupportedException { + boolean result = true; + + result = result && this.topicConfigManager.load(); + + result = result && this.consumerOffsetManager.load(); + result = result && this.subscriptionGroupManager.load(); + + if (result) { + try { + this.messageStore = + new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, + this.brokerConfig); + this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); + //load plugin + MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); + this.messageStore = MessageStoreFactory.build(context, this.messageStore); + } catch (IOException e) { + result = false; + e.printStackTrace(); + } + } + + result = result && this.messageStore.load(); + + if (result) { + this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); + NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); + fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); + this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); + this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getSendMessageThreadPoolNums(), + this.brokerConfig.getSendMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.sendThreadPoolQueue, + new ThreadFactoryImpl("SendMessageThread_")); + + this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( + this.brokerConfig.getPullMessageThreadPoolNums(), + this.brokerConfig.getPullMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.pullThreadPoolQueue, + new ThreadFactoryImpl("PullMessageThread_")); + + this.adminBrokerExecutor = + Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( + "AdminBrokerThread_")); + + this.clientManageExecutor = new ThreadPoolExecutor( + this.brokerConfig.getClientManageThreadPoolNums(), + this.brokerConfig.getClientManageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.clientManagerThreadPoolQueue, + new ThreadFactoryImpl("ClientManageThread_")); + + this.consumerManageExecutor = + Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( + "ConsumerManageThread_")); + + this.registerProcessor(); + + + // TODO remove in future + final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); + final long period = 1000 * 60 * 60 * 24; + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.getBrokerStats().record(); + } catch (Throwable e) { + log.error("schedule record error.", e); + } + } + }, initialDelay, period, TimeUnit.MILLISECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.consumerOffsetManager.persist(); + } catch (Throwable e) { + log.error("schedule persist consumerOffset error.", e); + } + } + }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.protectBroker(); + } catch (Exception e) { + log.error("protectBroker error.", e); + } + } + }, 3, 3, TimeUnit.MINUTES); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + BrokerController.this.printWaterMark(); + } catch (Exception e) { + log.error("printWaterMark error.", e); + } + } + }, 10, 1, TimeUnit.SECONDS); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); + } catch (Throwable e) { + log.error("schedule dispatchBehindBytes error.", e); + } + } + }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); + + if (this.brokerConfig.getNamesrvAddr() != null) { + this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); + } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); + } catch (Throwable e) { + log.error("ScheduledTask fetchNameServerAddr exception", e); + } + } + }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); + } + + if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { + if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { + this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); + this.updateMasterHAServerAddrPeriodically = false; + } else { + this.updateMasterHAServerAddrPeriodically = true; + } + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + BrokerController.this.slaveSynchronize.syncAll(); + } catch (Throwable e) { + log.error("ScheduledTask syncAll slave exception", e); + } + } + }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); + } else { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + BrokerController.this.printMasterAndSlaveDiff(); + } catch (Throwable e) { + log.error("schedule printMasterAndSlaveDiff error.", e); + } + } + }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); + } + } + + return result; + } + + public void registerProcessor() { + /** + * SendMessageProcessor + */ + SendMessageProcessor sendProcessor = new SendMessageProcessor(this); + sendProcessor.registerSendMessageHook(sendMessageHookList); + sendProcessor.registerConsumeMessageHook(consumeMessageHookList); + + this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); + /** + * PullMessageProcessor + */ + this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); + this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); + + /** + * QueryMessageProcessor + */ + NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); + this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); + this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); + + this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); + + /** + * ClientManageProcessor + */ + ClientManageProcessor clientProcessor = new ClientManageProcessor(this); + this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); + this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); + + this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); + + /** + * ConsumerManageProcessor + */ + ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); + this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); + this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + + this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + + + /** + * EndTransactionProcessor + */ + this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); + this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); + + /** + * Default + */ + AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); + this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); + this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); + } + + public BrokerStats getBrokerStats() { + return brokerStats; + } + + public void setBrokerStats(BrokerStats brokerStats) { + this.brokerStats = brokerStats; + } + + public void protectBroker() { + if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) { + final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator(); + while (it.hasNext()) { + final Map.Entry<String, MomentStatsItem> next = it.next(); + final long fallBehindBytes = next.getValue().getValue().get(); + if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) { + final String[] split = next.getValue().getStatsKey().split("@"); + final String group = split[2]; + LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes); + this.subscriptionGroupManager.disableConsume(group); + } + } + } + } + + public long headSlowTimeMills(BlockingQueue<Runnable> q) { + long slowTimeMills = 0; + final Runnable peek = q.peek(); + if (peek != null) { + RequestTask rt = BrokerFastFailure.castRunnable(peek); + slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp(); + } + + if (slowTimeMills < 0) slowTimeMills = 0; + + return slowTimeMills; + } + + public long headSlowTimeMills4SendThreadPoolQueue() { + return this.headSlowTimeMills(this.sendThreadPoolQueue); + } + + public long headSlowTimeMills4PullThreadPoolQueue() { + return this.headSlowTimeMills(this.pullThreadPoolQueue); + } + + public void printWaterMark() { + LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); + LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); + } + + public MessageStore getMessageStore() { + return messageStore; + } + + public void setMessageStore(MessageStore messageStore) { + this.messageStore = messageStore; + } + + private void printMasterAndSlaveDiff() { + long diff = this.messageStore.slaveFallBehindMuch(); + + // XXX: warn and notify me + log.info("slave fall behind master, how much, {} bytes", diff); + } + + public Broker2Client getBroker2Client() { + return broker2Client; + } + + public ConsumerManager getConsumerManager() { + return consumerManager; + } + + public ConsumerOffsetManager getConsumerOffsetManager() { + return consumerOffsetManager; + } + + public MessageStoreConfig getMessageStoreConfig() { + return messageStoreConfig; + } + + public ProducerManager getProducerManager() { + return producerManager; + } + + public void setFastRemotingServer(RemotingServer fastRemotingServer) { + this.fastRemotingServer = fastRemotingServer; + } + + public PullMessageProcessor getPullMessageProcessor() { + return pullMessageProcessor; + } + + public PullRequestHoldService getPullRequestHoldService() { + return pullRequestHoldService; + } + + public SubscriptionGroupManager getSubscriptionGroupManager() { + return subscriptionGroupManager; + } + + public void shutdown() { + if (this.brokerStatsManager != null) { + this.brokerStatsManager.shutdown(); + } + + if (this.clientHousekeepingService != null) { + this.clientHousekeepingService.shutdown(); + } + + if (this.pullRequestHoldService != null) { + this.pullRequestHoldService.shutdown(); + } + + if (this.remotingServer != null) { + this.remotingServer.shutdown(); + } + + if (this.fastRemotingServer != null) { + this.fastRemotingServer.shutdown(); + } + + if (this.messageStore != null) { + this.messageStore.shutdown(); + } + + this.scheduledExecutorService.shutdown(); + try { + this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + + this.unregisterBrokerAll(); + + if (this.sendMessageExecutor != null) { + this.sendMessageExecutor.shutdown(); + } + + if (this.pullMessageExecutor != null) { + this.pullMessageExecutor.shutdown(); + } + + if (this.adminBrokerExecutor != null) { + this.adminBrokerExecutor.shutdown(); + } + + if (this.brokerOuterAPI != null) { + this.brokerOuterAPI.shutdown(); + } + + this.consumerOffsetManager.persist(); + + if (this.filterServerManager != null) { + this.filterServerManager.shutdown(); + } + + if (this.brokerFastFailure != null) { + this.brokerFastFailure.shutdown(); + } + } + + private void unregisterBrokerAll() { + this.brokerOuterAPI.unregisterBrokerAll( + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId()); + } + + public String getBrokerAddr() { + return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort(); + } + + public void start() throws Exception { + if (this.messageStore != null) { + this.messageStore.start(); + } + + if (this.remotingServer != null) { + this.remotingServer.start(); + } + + if (this.fastRemotingServer != null) { + this.fastRemotingServer.start(); + } + + if (this.brokerOuterAPI != null) { + this.brokerOuterAPI.start(); + } + + if (this.pullRequestHoldService != null) { + this.pullRequestHoldService.start(); + } + + if (this.clientHousekeepingService != null) { + this.clientHousekeepingService.start(); + } + + if (this.filterServerManager != null) { + this.filterServerManager.start(); + } + + this.registerBrokerAll(true, false); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + BrokerController.this.registerBrokerAll(true, false); + } catch (Throwable e) { + log.error("registerBrokerAll Exception", e); + } + } + }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS); + + if (this.brokerStatsManager != null) { + this.brokerStatsManager.start(); + } + + if (this.brokerFastFailure != null) { + this.brokerFastFailure.start(); + } + } + + public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) { + TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); + + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); + for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { + TopicConfig tmp = + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); + topicConfigTable.put(topicConfig.getTopicName(), tmp); + } + topicConfigWrapper.setTopicConfigTable(topicConfigTable); + } + + RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll( + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.getHAServerAddr(), + topicConfigWrapper, + this.filterServerManager.buildNewFilterServerList(), + oneway, + this.brokerConfig.getRegisterBrokerTimeoutMills()); + + if (registerBrokerResult != null) { + if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { + this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); + } + + this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); + + if (checkOrderConfig) { + this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); + } + } + } + + public TopicConfigManager getTopicConfigManager() { + return topicConfigManager; + } + + public void setTopicConfigManager(TopicConfigManager topicConfigManager) { + this.topicConfigManager = topicConfigManager; + } + + public String getHAServerAddr() { + return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort(); + } + + public RebalanceLockManager getRebalanceLockManager() { + return rebalanceLockManager; + } + + public SlaveSynchronize getSlaveSynchronize() { + return slaveSynchronize; + } + + public ExecutorService getPullMessageExecutor() { + return pullMessageExecutor; + } + + public void setPullMessageExecutor(ExecutorService pullMessageExecutor) { + this.pullMessageExecutor = pullMessageExecutor; + } + + public BlockingQueue<Runnable> getSendThreadPoolQueue() { + return sendThreadPoolQueue; + } + + public FilterServerManager getFilterServerManager() { + return filterServerManager; + } + + public BrokerStatsManager getBrokerStatsManager() { + return brokerStatsManager; + } + + public List<SendMessageHook> getSendMessageHookList() { + return sendMessageHookList; + } + + public void registerSendMessageHook(final SendMessageHook hook) { + this.sendMessageHookList.add(hook); + log.info("register SendMessageHook Hook, {}", hook.hookName()); + } + + public List<ConsumeMessageHook> getConsumeMessageHookList() { + return consumeMessageHookList; + } + + public void registerConsumeMessageHook(final ConsumeMessageHook hook) { + this.consumeMessageHookList.add(hook); + log.info("register ConsumeMessageHook Hook, {}", hook.hookName()); + } + + public void registerServerRPCHook(RPCHook rpcHook) { + getRemotingServer().registerRPCHook(rpcHook); + } + + public RemotingServer getRemotingServer() { + return remotingServer; + } + + public void setRemotingServer(RemotingServer remotingServer) { + this.remotingServer = remotingServer; + } + + public void registerClientRPCHook(RPCHook rpcHook) { + this.getBrokerOuterAPI().registerRPCHook(rpcHook); + } + + public BrokerOuterAPI getBrokerOuterAPI() { + return brokerOuterAPI; + } + + public InetSocketAddress getStoreHost() { + return storeHost; + } + + public void setStoreHost(InetSocketAddress storeHost) { + this.storeHost = storeHost; + } + + public Configuration getConfiguration() { + return this.configuration; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java new file mode 100644 index 0000000..055e8dc --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker; + +import java.io.File; + + +public class BrokerPathConfigHelper { + private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store" + + File.separator + "config" + File.separator + "broker.properties"; + + + public static String getBrokerConfigPath() { + return brokerConfigPath; + } + + + public static void setBrokerConfigPath(String path) { + brokerConfigPath = path; + } + + + public static String getTopicConfigPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "topics.json"; + } + + + public static String getConsumerOffsetPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "consumerOffset.json"; + } + + + public static String getSubscriptionGroupPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java new file mode 100644 index 0000000..7e81117 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.joran.JoranConfigurator; +import com.alibaba.rocketmq.common.BrokerConfig; +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; +import com.alibaba.rocketmq.remoting.netty.NettyServerConfig; +import com.alibaba.rocketmq.remoting.netty.NettySystemConfig; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.srvutil.ServerUtil; +import com.alibaba.rocketmq.store.config.BrokerRole; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * @author shijia.wxr + */ +public class BrokerStartup { + public static Properties properties = null; + public static CommandLine commandLine = null; + public static String configFile = null; + public static Logger log; + + public static void main(String[] args) { + start(createBrokerController(args)); + } + + public static BrokerController start(BrokerController controller) { + try { + controller.start(); + String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", " + + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); + + if (null != controller.getBrokerConfig().getNamesrvAddr()) { + tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr(); + } + + log.info(tip); + return controller; + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + + return null; + } + + public static BrokerController createBrokerController(String[] args) { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { + NettySystemConfig.socketSndbufSize = 131072; + } + + if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { + NettySystemConfig.socketRcvbufSize = 131072; + } + + try { + //PackageConflictDetect.detectFastjson(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), + new PosixParser()); + if (null == commandLine) { + System.exit(-1); + } + + final BrokerConfig brokerConfig = new BrokerConfig(); + final NettyServerConfig nettyServerConfig = new NettyServerConfig(); + final NettyClientConfig nettyClientConfig = new NettyClientConfig(); + nettyServerConfig.setListenPort(10911); + final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + + if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { + int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; + messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); + } + + if (commandLine.hasOption('p')) { + MixAll.printObjectProperties(null, brokerConfig); + MixAll.printObjectProperties(null, nettyServerConfig); + MixAll.printObjectProperties(null, nettyClientConfig); + MixAll.printObjectProperties(null, messageStoreConfig); + System.exit(0); + } else if (commandLine.hasOption('m')) { + MixAll.printObjectProperties(null, brokerConfig, true); + MixAll.printObjectProperties(null, nettyServerConfig, true); + MixAll.printObjectProperties(null, nettyClientConfig, true); + MixAll.printObjectProperties(null, messageStoreConfig, true); + System.exit(0); + } + + if (commandLine.hasOption('c')) { + String file = commandLine.getOptionValue('c'); + if (file != null) { + configFile = file; + InputStream in = new BufferedInputStream(new FileInputStream(file)); + properties = new Properties(); + properties.load(in); + + parsePropertie2SystemEnv(properties); + MixAll.properties2Object(properties, brokerConfig); + MixAll.properties2Object(properties, nettyServerConfig); + MixAll.properties2Object(properties, nettyClientConfig); + MixAll.properties2Object(properties, messageStoreConfig); + + BrokerPathConfigHelper.setBrokerConfigPath(file); + in.close(); + } + } + + MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); + + if (null == brokerConfig.getRocketmqHome()) { + System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + + " variable in your environment to match the location of the RocketMQ installation"); + System.exit(-2); + } + + String namesrvAddr = brokerConfig.getNamesrvAddr(); + if (null != namesrvAddr) { + try { + String[] addrArray = namesrvAddr.split(";"); + if (addrArray != null) { + for (String addr : addrArray) { + RemotingUtil.string2SocketAddress(addr); + } + } + } catch (Exception e) { + System.out.printf( + "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", + namesrvAddr); + System.exit(-3); + } + } + + + switch (messageStoreConfig.getBrokerRole()) { + case ASYNC_MASTER: + case SYNC_MASTER: + brokerConfig.setBrokerId(MixAll.MASTER_ID); + break; + case SLAVE: + if (brokerConfig.getBrokerId() <= 0) { + System.out.printf("Slave's brokerId must be > 0"); + System.exit(-3); + } + + break; + default: + break; + } + + messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + JoranConfigurator configurator = new JoranConfigurator(); + configurator.setContext(lc); + lc.reset(); + configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); + log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + MixAll.printObjectProperties(log, brokerConfig); + MixAll.printObjectProperties(log, nettyServerConfig); + MixAll.printObjectProperties(log, nettyClientConfig); + MixAll.printObjectProperties(log, messageStoreConfig); + + final BrokerController controller = new BrokerController(// + brokerConfig, // + nettyServerConfig, // + nettyClientConfig, // + messageStoreConfig); + // remember all configs to prevent discard + controller.getConfiguration().registerConfig(properties); + + boolean initResult = controller.initialize(); + if (!initResult) { + controller.shutdown(); + System.exit(-3); + } + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + private volatile boolean hasShutdown = false; + private AtomicInteger shutdownTimes = new AtomicInteger(0); + + @Override + public void run() { + synchronized (this) { + log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); + if (!this.hasShutdown) { + this.hasShutdown = true; + long begineTime = System.currentTimeMillis(); + controller.shutdown(); + long consumingTimeTotal = System.currentTimeMillis() - begineTime; + log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); + } + } + } + }, "ShutdownHook")); + + return controller; + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + + return null; + } + + private static void parsePropertie2SystemEnv(Properties properties) { + if (properties == null) { + return; + } + String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", "jmenv.tbsite.net"); + String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", "nsaddr"); + System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain); + System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup); + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("c", "configFile", true, "Broker config properties file"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("p", "printConfigItem", false, "Print all config item"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("m", "printImportantConfig", false, "Print important config item"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java new file mode 100644 index 0000000..babf4b7 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.client; + +import com.alibaba.rocketmq.remoting.protocol.LanguageCode; +import io.netty.channel.Channel; + + +/** + * @author shijia.wxr + */ +public class ClientChannelInfo { + private final Channel channel; + private final String clientId; + private final LanguageCode language; + private final int version; + private volatile long lastUpdateTimestamp = System.currentTimeMillis(); + + + public ClientChannelInfo(Channel channel) { + this(channel, null, null, 0); + } + + + public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) { + this.channel = channel; + this.clientId = clientId; + this.language = language; + this.version = version; + } + + + public Channel getChannel() { + return channel; + } + + + public String getClientId() { + return clientId; + } + + + public LanguageCode getLanguage() { + return language; + } + + + public int getVersion() { + return version; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((channel == null) ? 0 : channel.hashCode()); + result = prime * result + ((clientId == null) ? 0 : clientId.hashCode()); + result = prime * result + ((language == null) ? 0 : language.hashCode()); + result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32)); + result = prime * result + version; + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ClientChannelInfo other = (ClientChannelInfo) obj; + if (channel == null) { + if (other.channel != null) + return false; + } else if (this.channel != other.channel) { + return false; + } + + return true; + } + + + @Override + public String toString() { + return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language + + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java new file mode 100644 index 0000000..4ac7532 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.client; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.ChannelEventListener; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +/** + * @author shijia.wxr + */ +public class ClientHousekeepingService implements ChannelEventListener { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + + private ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); + + + public ClientHousekeepingService(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + + public void start() { + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + ClientHousekeepingService.this.scanExceptionChannel(); + } catch (Exception e) { + log.error("", e); + } + } + }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); + } + + private void scanExceptionChannel() { + this.brokerController.getProducerManager().scanNotActiveChannel(); + this.brokerController.getConsumerManager().scanNotActiveChannel(); + this.brokerController.getFilterServerManager().scanNotActiveChannel(); + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } + + @Override + public void onChannelConnect(String remoteAddr, Channel channel) { + + } + + + @Override + public void onChannelClose(String remoteAddr, Channel channel) { + this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); + } + + + @Override + public void onChannelException(String remoteAddr, Channel channel) { + this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); + } + + + @Override + public void onChannelIdle(String remoteAddr, Channel channel) { + this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java new file mode 100644 index 0000000..410b703 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.client; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerGroupInfo { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final String groupName; + private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = + new ConcurrentHashMap<String, SubscriptionData>(); + private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + new ConcurrentHashMap<Channel, ClientChannelInfo>(16); + private volatile ConsumeType consumeType; + private volatile MessageModel messageModel; + private volatile ConsumeFromWhere consumeFromWhere; + private volatile long lastUpdateTimestamp = System.currentTimeMillis(); + + + public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel, + ConsumeFromWhere consumeFromWhere) { + this.groupName = groupName; + this.consumeType = consumeType; + this.messageModel = messageModel; + this.consumeFromWhere = consumeFromWhere; + } + + + public ClientChannelInfo findChannel(final String clientId) { + Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<Channel, ClientChannelInfo> next = it.next(); + if (next.getValue().getClientId().equals(clientId)) { + return next.getValue(); + } + } + + return null; + } + + + public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { + return subscriptionTable; + } + + + public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() { + return channelInfoTable; + } + + + public List<Channel> getAllChannel() { + List<Channel> result = new ArrayList<Channel>(); + + result.addAll(this.channelInfoTable.keySet()); + + return result; + } + + + public List<String> getAllClientId() { + List<String> result = new ArrayList<String>(); + + Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator(); + + while (it.hasNext()) { + Entry<Channel, ClientChannelInfo> entry = it.next(); + ClientChannelInfo clientChannelInfo = entry.getValue(); + result.add(clientChannelInfo.getClientId()); + } + + return result; + } + + + public void unregisterChannel(final ClientChannelInfo clientChannelInfo) { + ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel()); + if (old != null) { + log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString()); + } + } + + + public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { + final ClientChannelInfo info = this.channelInfoTable.remove(channel); + if (info != null) { + log.warn( + "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}", + info.toString(), groupName); + return true; + } + + return false; + } + + public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType, + MessageModel messageModel, ConsumeFromWhere consumeFromWhere) { + boolean updated = false; + this.consumeType = consumeType; + this.messageModel = messageModel; + this.consumeFromWhere = consumeFromWhere; + + ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel()); + if (null == infoOld) { + ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew); + if (null == prev) { + log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType, + messageModel, infoNew.toString()); + updated = true; + } + + infoOld = infoNew; + } else { + if (!infoOld.getClientId().equals(infoNew.getClientId())) { + log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", + this.groupName, + infoOld.toString(), + infoNew.toString()); + this.channelInfoTable.put(infoNew.getChannel(), infoNew); + } + } + + this.lastUpdateTimestamp = System.currentTimeMillis(); + infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp); + + return updated; + } + + + public boolean updateSubscription(final Set<SubscriptionData> subList) { + boolean updated = false; + + for (SubscriptionData sub : subList) { + SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); + if (old == null) { + SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub); + if (null == prev) { + updated = true; + log.info("subscription changed, add new topic, group: {} {}", + this.groupName, + sub.toString()); + } + } else if (sub.getSubVersion() > old.getSubVersion()) { + if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) { + log.info("subscription changed, group: {} OLD: {} NEW: {}", + this.groupName, + old.toString(), + sub.toString() + ); + } + + this.subscriptionTable.put(sub.getTopic(), sub); + } + } + + + Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, SubscriptionData> next = it.next(); + String oldTopic = next.getKey(); + + boolean exist = false; + for (SubscriptionData sub : subList) { + if (sub.getTopic().equals(oldTopic)) { + exist = true; + break; + } + } + + if (!exist) { + log.warn("subscription changed, group: {} remove topic {} {}", + this.groupName, + oldTopic, + next.getValue().toString() + ); + + it.remove(); + updated = true; + } + } + + this.lastUpdateTimestamp = System.currentTimeMillis(); + + return updated; + } + + + public Set<String> getSubscribeTopics() { + return subscriptionTable.keySet(); + } + + + public SubscriptionData findSubscriptionData(final String topic) { + return this.subscriptionTable.get(topic); + } + + + public ConsumeType getConsumeType() { + return consumeType; + } + + + public void setConsumeType(ConsumeType consumeType) { + this.consumeType = consumeType; + } + + + public MessageModel getMessageModel() { + return messageModel; + } + + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + + public String getGroupName() { + return groupName; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java new file mode 100644 index 0000000..4da2eb3 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.client; + +import io.netty.channel.Channel; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public interface ConsumerIdsChangeListener { + public void consumerIdsChanged(final String group, final List<Channel> channels); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java new file mode 100644 index 0000000..48e9673 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.client; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; + private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable = + new ConcurrentHashMap<String, ConsumerGroupInfo>(1024); + private final ConsumerIdsChangeListener consumerIdsChangeListener; + + public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) { + this.consumerIdsChangeListener = consumerIdsChangeListener; + } + + public ClientChannelInfo findChannel(final String group, final String clientId) { + ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); + if (consumerGroupInfo != null) { + return consumerGroupInfo.findChannel(clientId); + } + return null; + } + + public SubscriptionData findSubscriptionData(final String group, final String topic) { + ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group); + if (consumerGroupInfo != null) { + return consumerGroupInfo.findSubscriptionData(topic); + } + + return null; + } + + public ConsumerGroupInfo getConsumerGroupInfo(final String group) { + return this.consumerTable.get(group); + } + + public int findSubscriptionDataCount(final String group) { + ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group); + if (consumerGroupInfo != null) { + return consumerGroupInfo.getSubscriptionTable().size(); + } + + return 0; + } + + public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { + Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerGroupInfo> next = it.next(); + ConsumerGroupInfo info = next.getValue(); + boolean removed = info.doChannelCloseEvent(remoteAddr, channel); + if (removed) { + if (info.getChannelInfoTable().isEmpty()) { + ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey()); + if (remove != null) { + log.info("unregister consumer ok, no any connection, and remove consumer group, {}", + next.getKey()); + } + } + + this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel()); + } + } + } + + public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, + ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, + final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { + + ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); + if (null == consumerGroupInfo) { + ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); + ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); + consumerGroupInfo = prev != null ? prev : tmp; + } + + boolean r1 = + consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, + consumeFromWhere); + boolean r2 = consumerGroupInfo.updateSubscription(subList); + + if (r1 || r2) { + if (isNotifyConsumerIdsChangedEnable) { + this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel()); + } + } + + return r1 || r2; + } + + public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { + ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); + if (null != consumerGroupInfo) { + consumerGroupInfo.unregisterChannel(clientChannelInfo); + if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { + ConsumerGroupInfo remove = this.consumerTable.remove(group); + if (remove != null) { + log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); + } + } + if (isNotifyConsumerIdsChangedEnable) { + this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel()); + } + } + } + + public void scanNotActiveChannel() { + Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerGroupInfo> next = it.next(); + String group = next.getKey(); + ConsumerGroupInfo consumerGroupInfo = next.getValue(); + ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + consumerGroupInfo.getChannelInfoTable(); + + Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator(); + while (itChannel.hasNext()) { + Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next(); + ClientChannelInfo clientChannelInfo = nextChannel.getValue(); + long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp(); + if (diff > CHANNEL_EXPIRED_TIMEOUT) { + log.warn( + "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}", + RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group); + RemotingUtil.closeChannel(clientChannelInfo.getChannel()); + itChannel.remove(); + } + } + + if (channelInfoTable.isEmpty()) { + log.warn( + "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}", + group); + it.remove(); + } + } + } + + public HashSet<String> queryTopicConsumeByWho(final String topic) { + HashSet<String> groups = new HashSet<String>(); + Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumerGroupInfo> entry = it.next(); + ConcurrentHashMap<String, SubscriptionData> subscriptionTable = + entry.getValue().getSubscriptionTable(); + if (subscriptionTable.containsKey(topic)) { + groups.add(entry.getKey()); + } + } + return groups; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java new file mode 100644 index 0000000..0095913 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.client; + +import com.alibaba.rocketmq.broker.BrokerController; +import io.netty.channel.Channel; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener { + private final BrokerController brokerController; + + + public DefaultConsumerIdsChangeListener(BrokerController brokerController) { + this.brokerController = brokerController; + } + + + @Override + public void consumerIdsChanged(String group, List<Channel> channels) { + if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { + for (Channel chl : channels) { + this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); + } + } + } +}
