This is an automated email from the ASF dual-hosted git repository. dongeforever pushed a commit to branch 5.0.0-alpha-static-topic in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 1c69b64806001b97a3a279dfaeb730ada8c6aa05 Author: dongeforever <[email protected]> AuthorDate: Wed Dec 15 14:59:19 2021 +0800 Fix check style --- .../apache/rocketmq/broker/BrokerController.java | 295 ++++++++++----------- 1 file changed, 144 insertions(+), 151 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index d0d319d..76c8007 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -157,7 +157,7 @@ public class BrokerController { private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager(); private final BrokerOuterAPI brokerOuterAPI; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "BrokerControllerScheduledThread")); + "BrokerControllerScheduledThread")); private final SlaveSynchronize slaveSynchronize; private final BlockingQueue<Runnable> sendThreadPoolQueue; private final BlockingQueue<Runnable> ackThreadPoolQueue; @@ -204,10 +204,10 @@ public class BrokerController { private long shouldStartTime; public BrokerController( - final BrokerConfig brokerConfig, - final NettyServerConfig nettyServerConfig, - final NettyClientConfig nettyClientConfig, - final MessageStoreConfig messageStoreConfig + final BrokerConfig brokerConfig, + final NettyServerConfig nettyServerConfig, + final NettyClientConfig nettyClientConfig, + final MessageStoreConfig messageStoreConfig ) { this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; @@ -223,7 +223,7 @@ public class BrokerController { this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this); this.sendMessageProcessor = new SendMessageProcessor(this); this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, - this.popMessageProcessor); + this.popMessageProcessor); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); this.consumerFilterManager = new ConsumerFilterManager(this); @@ -255,9 +255,9 @@ public class BrokerController { this.brokerFastFailure = new BrokerFastFailure(this); this.configuration = new Configuration( - log, - BrokerPathConfigHelper.getBrokerConfigPath(), - this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig + log, + BrokerPathConfigHelper.getBrokerConfigPath(), + this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); } @@ -297,8 +297,8 @@ public class BrokerController { if (result) { try { this.messageStore = - new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, - this.brokerConfig); + new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, + this.brokerConfig); if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); @@ -322,77 +322,76 @@ public class BrokerController { fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getSendMessageThreadPoolNums(), - this.brokerConfig.getSendMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.sendThreadPoolQueue, - new ThreadFactoryImpl("SendMessageThread_")); + this.brokerConfig.getSendMessageThreadPoolNums(), + this.brokerConfig.getSendMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.sendThreadPoolQueue, + new ThreadFactoryImpl("SendMessageThread_")); this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getPullMessageThreadPoolNums(), - this.brokerConfig.getPullMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.pullThreadPoolQueue, - new ThreadFactoryImpl("PullMessageThread_")); + this.brokerConfig.getPullMessageThreadPoolNums(), + this.brokerConfig.getPullMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.pullThreadPoolQueue, + new ThreadFactoryImpl("PullMessageThread_")); this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getAckMessageThreadPoolNums(), - this.brokerConfig.getAckMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.ackThreadPoolQueue, - new ThreadFactoryImpl("AckMessageThread_")); - + this.brokerConfig.getAckMessageThreadPoolNums(), + this.brokerConfig.getAckMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.ackThreadPoolQueue, + new ThreadFactoryImpl("AckMessageThread_")); this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getProcessReplyMessageThreadPoolNums(), - this.brokerConfig.getProcessReplyMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.replyThreadPoolQueue, - new ThreadFactoryImpl("ProcessReplyMessageThread_")); + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.replyThreadPoolQueue, + new ThreadFactoryImpl("ProcessReplyMessageThread_")); this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getQueryMessageThreadPoolNums(), - this.brokerConfig.getQueryMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.queryThreadPoolQueue, - new ThreadFactoryImpl("QueryMessageThread_")); + this.brokerConfig.getQueryMessageThreadPoolNums(), + this.brokerConfig.getQueryMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.queryThreadPoolQueue, + new ThreadFactoryImpl("QueryMessageThread_")); this.adminBrokerExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( - "AdminBrokerThread_")); + Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( + "AdminBrokerThread_")); this.clientManageExecutor = new ThreadPoolExecutor( - this.brokerConfig.getClientManageThreadPoolNums(), - this.brokerConfig.getClientManageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.clientManagerThreadPoolQueue, - new ThreadFactoryImpl("ClientManageThread_")); + this.brokerConfig.getClientManageThreadPoolNums(), + this.brokerConfig.getClientManageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.clientManagerThreadPoolQueue, + new ThreadFactoryImpl("ClientManageThread_")); this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getHeartbeatThreadPoolNums(), - this.brokerConfig.getHeartbeatThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.heartbeatThreadPoolQueue, - new ThreadFactoryImpl("HeartbeatThread_", true)); + this.brokerConfig.getHeartbeatThreadPoolNums(), + this.brokerConfig.getHeartbeatThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.heartbeatThreadPoolQueue, + new ThreadFactoryImpl("HeartbeatThread_", true)); this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getEndTransactionThreadPoolNums(), - this.brokerConfig.getEndTransactionThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.endTransactionThreadPoolQueue, - new ThreadFactoryImpl("EndTransactionThread_")); + this.brokerConfig.getEndTransactionThreadPoolNums(), + this.brokerConfig.getEndTransactionThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.endTransactionThreadPoolQueue, + new ThreadFactoryImpl("EndTransactionThread_")); this.consumerManageExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( - "ConsumerManageThread_")); + Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( + "ConsumerManageThread_")); this.registerProcessor(); @@ -466,8 +465,8 @@ public class BrokerController { }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); this.loadBalanceExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl( - "LoadBalanceProcessorThread_")); + Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl( + "LoadBalanceProcessorThread_")); if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); @@ -494,7 +493,6 @@ public class BrokerController { } }, 1, 5, TimeUnit.SECONDS); - if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { @@ -523,38 +521,38 @@ public class BrokerController { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( - new String[]{ - TlsSystemConfig.tlsServerCertPath, - TlsSystemConfig.tlsServerKeyPath, - TlsSystemConfig.tlsServerTrustCertPath - }, - new FileWatchService.Listener() { - boolean certChanged, keyChanged = false; - - @Override - public void onChanged(String path) { - if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { - log.info("The trust certificate changed, reload the ssl context"); - reloadServerSslContext(); - } - if (path.equals(TlsSystemConfig.tlsServerCertPath)) { - certChanged = true; - } - if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { - keyChanged = true; - } - if (certChanged && keyChanged) { - log.info("The certificate and private key changed, reload the ssl context"); - certChanged = keyChanged = false; - reloadServerSslContext(); - } + new String[] { + TlsSystemConfig.tlsServerCertPath, + TlsSystemConfig.tlsServerKeyPath, + TlsSystemConfig.tlsServerTrustCertPath + }, + new FileWatchService.Listener() { + boolean certChanged, keyChanged = false; + + @Override + public void onChanged(String path) { + if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { + log.info("The trust certificate changed, reload the ssl context"); + reloadServerSslContext(); } - - private void reloadServerSslContext() { - ((NettyRemotingServer) remotingServer).loadSslContext(); - ((NettyRemotingServer) fastRemotingServer).loadSslContext(); + if (path.equals(TlsSystemConfig.tlsServerCertPath)) { + certChanged = true; + } + if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { + keyChanged = true; + } + if (certChanged && keyChanged) { + log.info("The certificate and private key changed, reload the ssl context"); + certChanged = keyChanged = false; + reloadServerSslContext(); } - }); + } + + private void reloadServerSslContext() { + ((NettyRemotingServer) remotingServer).loadSslContext(); + ((NettyRemotingServer) fastRemotingServer).loadSslContext(); + } + }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } @@ -611,7 +609,6 @@ public class BrokerController { } } - private void initialRpcHooks() { List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class); @@ -623,7 +620,6 @@ public class BrokerController { } } - public String getBrokerAddrByName(String brokerName) { return this.brokerName2AddrMap.get(brokerName); } @@ -962,10 +958,10 @@ public class BrokerController { private void unregisterBrokerAll() { this.brokerOuterAPI.unregisterBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId()); + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId()); } public String getBrokerAddr() { @@ -1048,7 +1044,6 @@ public class BrokerController { this.brokerFastFailure.start(); } - } public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { @@ -1064,30 +1059,30 @@ public class BrokerController { topicConfigSerializeWrapper.setDataVersion(dataVersion); ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream() - .map(topicConfig -> { - TopicConfig registerTopicConfig; - if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) - || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { - registerTopicConfig = - new TopicConfig(topicConfig.getTopicName(), - topicConfig.getReadQueueNums(), - topicConfig.getWriteQueueNums(), - this.brokerConfig.getBrokerPermission()); - } else { - registerTopicConfig = new TopicConfig(topicConfig); - } - return registerTopicConfig; - }) - .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity())); + .map(topicConfig -> { + TopicConfig registerTopicConfig; + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + registerTopicConfig = + new TopicConfig(topicConfig.getTopicName(), + topicConfig.getReadQueueNums(), + topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); + } else { + registerTopicConfig = new TopicConfig(topicConfig); + } + return registerTopicConfig; + }) + .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity())); topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream() - .map(TopicConfig::getTopicName) - .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName)) - .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info))) - .orElse(null)) - .filter(Objects::nonNull) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .map(TopicConfig::getTopicName) + .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName)) + .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info))) + .orElse(null)) + .filter(Objects::nonNull) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!topicQueueMappingInfoMap.isEmpty()) { topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap); } @@ -1107,39 +1102,39 @@ public class BrokerController { ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) - || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = - new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), - this.brokerConfig.getBrokerPermission()); + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), - this.brokerConfig.getRegisterBrokerTimeoutMills())) { + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.brokerConfig.getRegisterBrokerTimeoutMills())) { doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } } private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, - TopicConfigAndMappingSerializeWrapper topicConfigWrapper) { + TopicConfigAndMappingSerializeWrapper topicConfigWrapper) { List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), - this.getHAServerAddr(), - topicConfigWrapper, - this.filterServerManager.buildNewFilterServerList(), - oneway, - this.brokerConfig.getRegisterBrokerTimeoutMills(), - this.brokerConfig.isCompressedRegister()); + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.getHAServerAddr(), + topicConfigWrapper, + this.filterServerManager.buildNewFilterServerList(), + oneway, + this.brokerConfig.getRegisterBrokerTimeoutMills(), + this.brokerConfig.isCompressedRegister()); if (registerBrokerResultList.size() > 0) { RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); @@ -1158,10 +1153,10 @@ public class BrokerController { } private boolean needRegister(final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId, - final int timeoutMills) { + final String brokerAddr, + final String brokerName, + final long brokerId, + final int timeoutMills) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills); @@ -1279,7 +1274,7 @@ public class BrokerController { } public void setTransactionalMessageCheckService( - TransactionalMessageCheckService transactionalMessageCheckService) { + TransactionalMessageCheckService transactionalMessageCheckService) { this.transactionalMessageCheckService = transactionalMessageCheckService; } @@ -1296,11 +1291,10 @@ public class BrokerController { } public void setTransactionalMessageCheckListener( - AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) { + AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) { this.transactionalMessageCheckListener = transactionalMessageCheckListener; } - public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() { return endTransactionThreadPoolQueue; @@ -1367,7 +1361,6 @@ public class BrokerController { log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId); } - public void changeToMaster(BrokerRole role) { if (role == BrokerRole.SLAVE) { return;
