This is an automated email from the ASF dual-hosted git repository. dongeforever pushed a commit to branch 5.0.0-alpha-static-topic in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit ba2c768f5ff6fb22a5b9a3660bdbb078b95c185f Author: dongeforever <[email protected]> AuthorDate: Wed Dec 15 14:57:32 2021 +0800 Fix check stype --- .../apache/rocketmq/broker/BrokerController.java | 304 ++++++++++----------- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 23 +- .../broker/processor/AdminBrokerProcessor.java | 15 +- .../broker/processor/ConsumerManageProcessor.java | 21 +- .../rocketmq/broker/slave/SlaveSynchronize.java | 8 +- .../topic/TopicQueueMappingCleanService.java | 6 +- .../broker/topic/TopicQueueMappingManager.java | 13 +- .../consumer/store/RemoteBrokerOffsetStore.java | 18 +- .../client/exception/OffsetNotFoundException.java | 16 ++ .../rocketmq/client/impl/MQClientAPIImpl.java | 2 +- .../client/impl/consumer/PullAPIWrapper.java | 21 +- .../client/impl/factory/MQClientInstance.java | 51 ++-- .../header/GetTopicConfigRequestHeader.java | 2 - .../apache/rocketmq/common/rpc/ClientMetadata.java | 16 ++ .../apache/rocketmq/common/rpc/RequestBuilder.java | 16 ++ .../org/apache/rocketmq/common/rpc/RpcClient.java | 16 ++ .../apache/rocketmq/common/rpc/RpcClientHook.java | 18 +- .../apache/rocketmq/common/rpc/RpcClientImpl.java | 33 ++- .../apache/rocketmq/common/rpc/RpcClientUtils.java | 16 ++ .../apache/rocketmq/common/rpc/RpcException.java | 16 ++ .../common/statictopic/LogicQueueMappingItem.java | 16 ++ .../common/statictopic/TopicQueueMappingInfo.java | 19 +- .../common/statictopic/TopicQueueMappingUtils.java | 10 +- .../statictopic/TopicRemappingDetailWrapper.java | 16 ++ .../remoting/protocol/RemotingCommand.java | 2 - .../test/client/rmq/RMQNormalProducer.java | 3 - .../rocketmq/test/util/MQAdminTestUtils.java | 101 +++---- .../rocketmq/tools/admin/DefaultMQAdminExt.java | 2 - .../tools/admin/DefaultMQAdminExtImpl.java | 4 - .../apache/rocketmq/tools/admin/MQAdminExt.java | 2 - .../apache/rocketmq/tools/admin/MQAdminUtils.java | 20 +- .../topic/RemappingStaticTopicSubCommand.java | 8 +- .../command/topic/UpdateStaticTopicSubCommand.java | 9 +- 33 files changed, 473 insertions(+), 370 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 338f31e..d0d319d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -157,7 +157,7 @@ public class BrokerController { private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager(); private final BrokerOuterAPI brokerOuterAPI; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "BrokerControllerScheduledThread")); + "BrokerControllerScheduledThread")); private final SlaveSynchronize slaveSynchronize; private final BlockingQueue<Runnable> sendThreadPoolQueue; private final BlockingQueue<Runnable> ackThreadPoolQueue; @@ -200,14 +200,14 @@ public class BrokerController { private TransactionalMessageService transactionalMessageService; private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener; private Future<?> slaveSyncFuture; - private Map<Class,AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>(); + private Map<Class, AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>(); private long shouldStartTime; public BrokerController( - final BrokerConfig brokerConfig, - final NettyServerConfig nettyServerConfig, - final NettyClientConfig nettyClientConfig, - final MessageStoreConfig messageStoreConfig + final BrokerConfig brokerConfig, + final NettyServerConfig nettyServerConfig, + final NettyClientConfig nettyClientConfig, + final MessageStoreConfig messageStoreConfig ) { this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; @@ -223,7 +223,7 @@ public class BrokerController { this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this); this.sendMessageProcessor = new SendMessageProcessor(this); this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, - this.popMessageProcessor); + this.popMessageProcessor); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); this.consumerFilterManager = new ConsumerFilterManager(this); @@ -255,9 +255,9 @@ public class BrokerController { this.brokerFastFailure = new BrokerFastFailure(this); this.configuration = new Configuration( - log, - BrokerPathConfigHelper.getBrokerConfigPath(), - this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig + log, + BrokerPathConfigHelper.getBrokerConfigPath(), + this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); } @@ -297,11 +297,11 @@ public class BrokerController { if (result) { try { this.messageStore = - new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, - this.brokerConfig); + new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, + this.brokerConfig); if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); - ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); + ((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); } this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin @@ -322,77 +322,77 @@ public class BrokerController { fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getSendMessageThreadPoolNums(), - this.brokerConfig.getSendMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.sendThreadPoolQueue, - new ThreadFactoryImpl("SendMessageThread_")); + this.brokerConfig.getSendMessageThreadPoolNums(), + this.brokerConfig.getSendMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.sendThreadPoolQueue, + new ThreadFactoryImpl("SendMessageThread_")); this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getPullMessageThreadPoolNums(), - this.brokerConfig.getPullMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.pullThreadPoolQueue, - new ThreadFactoryImpl("PullMessageThread_")); + this.brokerConfig.getPullMessageThreadPoolNums(), + this.brokerConfig.getPullMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.pullThreadPoolQueue, + new ThreadFactoryImpl("PullMessageThread_")); this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getAckMessageThreadPoolNums(), - this.brokerConfig.getAckMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.ackThreadPoolQueue, - new ThreadFactoryImpl("AckMessageThread_")); + this.brokerConfig.getAckMessageThreadPoolNums(), + this.brokerConfig.getAckMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.ackThreadPoolQueue, + new ThreadFactoryImpl("AckMessageThread_")); this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getProcessReplyMessageThreadPoolNums(), - this.brokerConfig.getProcessReplyMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.replyThreadPoolQueue, - new ThreadFactoryImpl("ProcessReplyMessageThread_")); + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + this.brokerConfig.getProcessReplyMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.replyThreadPoolQueue, + new ThreadFactoryImpl("ProcessReplyMessageThread_")); this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getQueryMessageThreadPoolNums(), - this.brokerConfig.getQueryMessageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.queryThreadPoolQueue, - new ThreadFactoryImpl("QueryMessageThread_")); + this.brokerConfig.getQueryMessageThreadPoolNums(), + this.brokerConfig.getQueryMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.queryThreadPoolQueue, + new ThreadFactoryImpl("QueryMessageThread_")); this.adminBrokerExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( - "AdminBrokerThread_")); + Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( + "AdminBrokerThread_")); this.clientManageExecutor = new ThreadPoolExecutor( - this.brokerConfig.getClientManageThreadPoolNums(), - this.brokerConfig.getClientManageThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.clientManagerThreadPoolQueue, - new ThreadFactoryImpl("ClientManageThread_")); + this.brokerConfig.getClientManageThreadPoolNums(), + this.brokerConfig.getClientManageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.clientManagerThreadPoolQueue, + new ThreadFactoryImpl("ClientManageThread_")); this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getHeartbeatThreadPoolNums(), - this.brokerConfig.getHeartbeatThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.heartbeatThreadPoolQueue, - new ThreadFactoryImpl("HeartbeatThread_", true)); + this.brokerConfig.getHeartbeatThreadPoolNums(), + this.brokerConfig.getHeartbeatThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.heartbeatThreadPoolQueue, + new ThreadFactoryImpl("HeartbeatThread_", true)); this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( - this.brokerConfig.getEndTransactionThreadPoolNums(), - this.brokerConfig.getEndTransactionThreadPoolNums(), - 1000 * 60, - TimeUnit.MILLISECONDS, - this.endTransactionThreadPoolQueue, - new ThreadFactoryImpl("EndTransactionThread_")); + this.brokerConfig.getEndTransactionThreadPoolNums(), + this.brokerConfig.getEndTransactionThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.endTransactionThreadPoolQueue, + new ThreadFactoryImpl("EndTransactionThread_")); this.consumerManageExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( - "ConsumerManageThread_")); + Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( + "ConsumerManageThread_")); this.registerProcessor(); @@ -466,8 +466,8 @@ public class BrokerController { }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); this.loadBalanceExecutor = - Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl( - "LoadBalanceProcessorThread_")); + Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl( + "LoadBalanceProcessorThread_")); if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); @@ -523,38 +523,38 @@ public class BrokerController { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( - new String[] { - TlsSystemConfig.tlsServerCertPath, - TlsSystemConfig.tlsServerKeyPath, - TlsSystemConfig.tlsServerTrustCertPath - }, - new FileWatchService.Listener() { - boolean certChanged, keyChanged = false; - - @Override - public void onChanged(String path) { - if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { - log.info("The trust certificate changed, reload the ssl context"); - reloadServerSslContext(); + new String[]{ + TlsSystemConfig.tlsServerCertPath, + TlsSystemConfig.tlsServerKeyPath, + TlsSystemConfig.tlsServerTrustCertPath + }, + new FileWatchService.Listener() { + boolean certChanged, keyChanged = false; + + @Override + public void onChanged(String path) { + if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { + log.info("The trust certificate changed, reload the ssl context"); + reloadServerSslContext(); + } + if (path.equals(TlsSystemConfig.tlsServerCertPath)) { + certChanged = true; + } + if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { + keyChanged = true; + } + if (certChanged && keyChanged) { + log.info("The certificate and private key changed, reload the ssl context"); + certChanged = keyChanged = false; + reloadServerSslContext(); + } } - if (path.equals(TlsSystemConfig.tlsServerCertPath)) { - certChanged = true; - } - if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { - keyChanged = true; - } - if (certChanged && keyChanged) { - log.info("The certificate and private key changed, reload the ssl context"); - certChanged = keyChanged = false; - reloadServerSslContext(); - } - } - private void reloadServerSslContext() { - ((NettyRemotingServer) remotingServer).loadSslContext(); - ((NettyRemotingServer) fastRemotingServer).loadSslContext(); - } - }); + private void reloadServerSslContext() { + ((NettyRemotingServer) remotingServer).loadSslContext(); + ((NettyRemotingServer) fastRemotingServer).loadSslContext(); + } + }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } @@ -593,9 +593,9 @@ public class BrokerController { return; } - for (AccessValidator accessValidator: accessValidators) { + for (AccessValidator accessValidator : accessValidators) { final AccessValidator validator = accessValidator; - accessValidatorMap.put(validator.getClass(),validator); + accessValidatorMap.put(validator.getClass(), validator); this.registerServerRPCHook(new RPCHook() { @Override @@ -618,7 +618,7 @@ public class BrokerController { if (rpcHooks == null || rpcHooks.isEmpty()) { return; } - for (RPCHook rpcHook: rpcHooks) { + for (RPCHook rpcHook : rpcHooks) { this.registerServerRPCHook(rpcHook); } } @@ -962,10 +962,10 @@ public class BrokerController { private void unregisterBrokerAll() { this.brokerOuterAPI.unregisterBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId()); + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId()); } public String getBrokerAddr() { @@ -1064,30 +1064,30 @@ public class BrokerController { topicConfigSerializeWrapper.setDataVersion(dataVersion); ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream() - .map(topicConfig -> { - TopicConfig registerTopicConfig; - if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) - || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { - registerTopicConfig = - new TopicConfig(topicConfig.getTopicName(), - topicConfig.getReadQueueNums(), - topicConfig.getWriteQueueNums(), - this.brokerConfig.getBrokerPermission()); - } else { - registerTopicConfig = new TopicConfig(topicConfig); - } - return registerTopicConfig; - }) - .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity())); + .map(topicConfig -> { + TopicConfig registerTopicConfig; + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + registerTopicConfig = + new TopicConfig(topicConfig.getTopicName(), + topicConfig.getReadQueueNums(), + topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); + } else { + registerTopicConfig = new TopicConfig(topicConfig); + } + return registerTopicConfig; + }) + .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity())); topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream() - .map(TopicConfig::getTopicName) - .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName)) - .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info))) - .orElse(null)) - .filter(Objects::nonNull) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .map(TopicConfig::getTopicName) + .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName)) + .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info))) + .orElse(null)) + .filter(Objects::nonNull) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!topicQueueMappingInfoMap.isEmpty()) { topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap); } @@ -1103,43 +1103,43 @@ public class BrokerController { topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable()); topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map( - entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())) + entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())) ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) - || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = - new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), - this.brokerConfig.getBrokerPermission()); + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), - this.brokerConfig.getRegisterBrokerTimeoutMills())) { + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.brokerConfig.getRegisterBrokerTimeoutMills())) { doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } } private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, - TopicConfigAndMappingSerializeWrapper topicConfigWrapper) { + TopicConfigAndMappingSerializeWrapper topicConfigWrapper) { List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), - this.getHAServerAddr(), - topicConfigWrapper, - this.filterServerManager.buildNewFilterServerList(), - oneway, - this.brokerConfig.getRegisterBrokerTimeoutMills(), - this.brokerConfig.isCompressedRegister()); + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.getHAServerAddr(), + topicConfigWrapper, + this.filterServerManager.buildNewFilterServerList(), + oneway, + this.brokerConfig.getRegisterBrokerTimeoutMills(), + this.brokerConfig.isCompressedRegister()); if (registerBrokerResultList.size() > 0) { RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); @@ -1158,10 +1158,10 @@ public class BrokerController { } private boolean needRegister(final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId, - final int timeoutMills) { + final String brokerAddr, + final String brokerName, + final long brokerId, + final int timeoutMills) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills); @@ -1279,7 +1279,7 @@ public class BrokerController { } public void setTransactionalMessageCheckService( - TransactionalMessageCheckService transactionalMessageCheckService) { + TransactionalMessageCheckService transactionalMessageCheckService) { this.transactionalMessageCheckService = transactionalMessageCheckService; } @@ -1296,7 +1296,7 @@ public class BrokerController { } public void setTransactionalMessageCheckListener( - AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) { + AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) { this.transactionalMessageCheckListener = transactionalMessageCheckListener; } @@ -1321,8 +1321,7 @@ public class BrokerController { public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); - } - catch (Throwable e) { + } catch (Throwable e) { log.error("ScheduledTask SlaveSynchronize syncAll error.", e); } } @@ -1369,7 +1368,6 @@ public class BrokerController { } - public void changeToMaster(BrokerRole role) { if (role == BrokerRole.SLAVE) { return; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 6a5b31e..8a2093a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -16,15 +16,6 @@ */ package org.apache.rocketmq.broker.out; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import com.alibaba.fastjson.JSON; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -44,10 +35,6 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; -import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; -import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader; @@ -63,8 +50,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.common.rpc.RpcRequest; -import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -75,6 +60,14 @@ import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + public class BrokerOuterAPI { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final RemotingClient remotingClient; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 02a0060..0d2a59b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -20,7 +20,6 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.util.concurrent.CompleteFuture; import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.broker.BrokerController; @@ -154,8 +153,6 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; -import java.util.function.BiConsumer; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; @@ -651,7 +648,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } - //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp + //TO DO should make sure the timestampOfOffset is equal or bigger than the searched timestamp Long timestamp = requestHeader.getTimestamp(); long offset = -1; for (int i = 0; i < mappingItems.size(); i++) { @@ -795,7 +792,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements //this may not return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d is not leader in broker %s, request code %d", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname(), request.getCode())))); - }; + } GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader(); LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); assert mappingItem != null; @@ -862,14 +859,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); - }; + } LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); assert mappingItem != null; try { requestHeader.setBname(mappingItem.getBname()); requestHeader.setLo(false); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null); - //TODO check if it is in current broker + //TO DO check if it is in current broker RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); @@ -882,7 +879,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; - }catch (Throwable t) { + } catch (Throwable t) { return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } } @@ -1058,7 +1055,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); } TopicStatsTable topicStatsTable = new TopicStatsTable(); - qidItemMap.forEach( (qid, itemPair) -> { + qidItemMap.forEach((qid, itemPair) -> { LogicQueueMappingItem minItem = itemPair[0]; LogicQueueMappingItem maxItem = itemPair[1]; TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId())); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 66abe62..04e705b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -17,18 +17,9 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.ChannelHandlerContext; -import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.rpc.RpcRequest; -import org.apache.rocketmq.common.rpc.RpcResponse; -import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; -import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; -import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; @@ -38,6 +29,12 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader; +import org.apache.rocketmq.common.rpc.RpcRequest; +import org.apache.rocketmq.common.rpc.RpcResponse; +import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; @@ -45,6 +42,8 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.List; + import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { @@ -177,7 +176,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen if (rpcResponse.getCode() == ResponseCode.SUCCESS) { offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset(); break; - } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND){ + } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND) { continue; } else { //this should not happen @@ -234,7 +233,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) { response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, do not set to zero, maybe this group boot first"); - }else if (minOffset <= 0 + } else if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 0029318..9bb09bd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -16,19 +16,19 @@ */ package org.apache.rocketmq.broker.slave; -import java.io.IOException; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper; -import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; -import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.store.config.StorePathConfigHelper; +import java.io.IOException; + public class SlaveSynchronize { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java index 9518de0..0533668 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java @@ -104,7 +104,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { boolean changed = false; long start = System.currentTimeMillis(); try { - for(String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) { + for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) { try { if (isStopped()) { break; @@ -123,7 +123,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { if (items.size() <= 1) { continue; } - if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) { + if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) { continue; } LogicQueueMappingItem earlistItem = items.get(0); @@ -154,7 +154,7 @@ public class TopicQueueMappingCleanService extends ServiceThread { if (items.size() <= 1) { continue; } - if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) { + if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) { continue; } LogicQueueMappingItem earlistItem = items.get(0); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 1633440..56fc792 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -21,18 +21,12 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.DataVersion; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper; -import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; -import org.apache.rocketmq.common.rpc.RpcRequest; -import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.common.rpc.TopicRequestHeader; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; -import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; @@ -40,12 +34,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -98,7 +87,7 @@ public class TopicQueueMappingManager extends ConfigManager { } if (force) { //bakeup the old items - oldDetail.getHostedQueues().forEach( (queueId, items) -> { + oldDetail.getHostedQueues().forEach((queueId, items) -> { newDetail.getHostedQueues().putIfAbsent(queueId, items); }); topicQueueMappingTable.put(newDetail.getTopic(), newDetail); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 91d12a0..fc0b29c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -16,13 +16,6 @@ */ package org.apache.rocketmq.client.consumer.store; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.OffsetNotFoundException; @@ -31,13 +24,20 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.exception.RemotingException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + /** * Remote storage implementation */ diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java index c3d275f..e73bbbf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java +++ b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.client.exception; public class OffsetNotFoundException extends MQBrokerException { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 1de4fdb..0c15cff 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1235,7 +1235,7 @@ public class MQClientAPIImpl { (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); return responseHeader.getOffset(); } - case ResponseCode.PULL_NOT_FOUND:{ + case ResponseCode.PULL_NOT_FOUND: { throw new OffsetNotFoundException(response.getCode(), response.getRemark(), addr); } default: diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 273add4..30e8439 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -16,17 +16,7 @@ */ package org.apache.rocketmq.client.impl.consumer; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - -import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.consumer.PopCallback; -import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; @@ -41,18 +31,27 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.filter.ExpressionType; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.exception.RemotingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + public class PullAPIWrapper { private final InternalLogger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 77add20..793189e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -16,28 +16,6 @@ */ package org.apache.rocketmq.client.impl.factory; -import java.io.UnsupportedEncodingException; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; @@ -66,11 +44,10 @@ import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; -import org.apache.rocketmq.common.message.MessageQueueAssignment; -import org.apache.rocketmq.common.protocol.NamespaceUtil; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageQueueAssignment; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -82,12 +59,34 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.io.UnsupportedEncodingException; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import static org.apache.rocketmq.common.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic; public class MQClientInstance { @@ -169,7 +168,7 @@ public class MQClientInstance { public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) { TopicPublishInfo info = new TopicPublishInfo(); - // TODO should check the usage of raw route, it is better to remove such field + // TO DO should check the usage of raw route, it is better to remove such field info.setTopicRouteData(route); if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) { String[] brokers = route.getOrderTopicConf().split(";"); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java index b701df6..bc7586a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java @@ -17,9 +17,7 @@ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.common.rpc.TopicRequestHeader; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java index 4554557..28a5e64 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.common.rpc; import org.apache.rocketmq.common.MixAll; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java index f9478e4..9fec087 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.common.rpc; import org.apache.rocketmq.common.message.MessageQueue; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java index e4de3e0..7876fdf 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.common.rpc; import org.apache.rocketmq.common.message.MessageQueue; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java index ca0f2d4..e3430b5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java @@ -1,7 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.common.rpc; -import java.util.concurrent.Future; - public abstract class RpcClientHook { //if the return is not null, return it diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 97879d1..62e6ec1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -1,8 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.common.rpc; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; @@ -10,16 +25,12 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; @@ -190,7 +201,7 @@ public class RpcClientImpl implements RpcClient { rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); break; } - default:{ + default: { rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); } } @@ -216,7 +227,7 @@ public class RpcClientImpl implements RpcClient { rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null)); break; } - default:{ + default: { rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); } } @@ -233,7 +244,7 @@ public class RpcClientImpl implements RpcClient { rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(responseCommand.getBody(), bodyClass))); break; } - default:{ + default: { rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); } } @@ -254,7 +265,7 @@ public class RpcClientImpl implements RpcClient { rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); break; } - default:{ + default: { rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); } } @@ -275,7 +286,7 @@ public class RpcClientImpl implements RpcClient { rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); break; } - default:{ + default: { rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); } } @@ -296,7 +307,7 @@ public class RpcClientImpl implements RpcClient { rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); break; } - default:{ + default: { rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java index 61dce64..40c6eef 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.common.rpc; import org.apache.rocketmq.remoting.protocol.RemotingCommand; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java index fc096df..36fc056 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.common.rpc; import org.apache.rocketmq.remoting.exception.RemotingException; diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index 76e7406..3c217f5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.common.statictopic; import org.apache.commons.lang3.builder.EqualsBuilder; diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java index e9cf6f7..784247d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java @@ -15,9 +15,22 @@ * limitations under the License. */ package org.apache.rocketmq.common.statictopic; - -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index e7e7817..75f0a4f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -224,7 +224,7 @@ public class TopicQueueMappingUtils { if (newItem.getGen() < oldItem.getGen()) { //the earliest item may have been deleted concurrently inew++; - } else if (oldItem.getGen() < newItem.getGen()){ + } else if (oldItem.getGen() < newItem.getGen()) { //in the following cases, the new item-list has less items than old item-list //1. the queue is mapped back to a broker which hold the logic queue before //2. The earliest item is deleted by TopicQueueMappingCleanService @@ -260,7 +260,7 @@ public class TopicQueueMappingUtils { } int lastGen = -1; long lastOffset = -1; - for (int i = items.size() - 1; i >=0 ; i--) { + for (int i = items.size() - 1; i >= 0 ; i--) { LogicQueueMappingItem item = items.get(i); if (item.getStartOffset() < 0 || item.getGen() < 0 @@ -414,9 +414,9 @@ public class TopicQueueMappingUtils { } public static long blockSeqRoundUp(long offset, long blockSeqSize) { - long num = offset/blockSeqSize; + long num = offset / blockSeqSize; long left = offset % blockSeqSize; - if (left < blockSeqSize/2) { + if (left < blockSeqSize / 2) { return (num + 1) * blockSeqSize; } else { return (num + 2) * blockSeqSize; @@ -535,7 +535,7 @@ public class TopicQueueMappingUtils { } Map<String, Integer> brokerNumMapBeforeRemapping = new HashMap<String, Integer>(); for (TopicQueueMappingOne mappingOne: globalIdMap.values()) { - if(brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) { + if (brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) { brokerNumMapBeforeRemapping.put(mappingOne.bname, brokerNumMapBeforeRemapping.get(mappingOne.bname) + 1); } else { brokerNumMapBeforeRemapping.put(mappingOne.bname, 1); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java index e865b6b..ce02a93 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.common.statictopic; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 0e32226..03b1640 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -28,11 +28,9 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java index 4f5d38e..001db95 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java @@ -152,9 +152,6 @@ public class RMQNormalProducer extends AbstractMQProducer { sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK)); sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName()); msgBodys.addData(new String(msg.getBody())); - if (originMsgs.getAllData().contains(msg)) { - System.out.println("Hash collision"); - } originMsgs.addData(msg); originMsgIndex.put(new String(msg.getBody()), metaqResult); } catch (Exception e) { diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java index 6784b76..8287d81 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java @@ -17,18 +17,10 @@ package org.apache.rocketmq.test.util; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ForkJoinPool; -import java.util.stream.Collectors; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.log4j.Logger; -import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; @@ -42,27 +34,29 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; -import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; -import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.MQAdminUtils; import org.apache.rocketmq.tools.command.CommandUtil; -import org.apache.rocketmq.tools.command.MQAdminStartup; import org.apache.rocketmq.tools.command.topic.RemappingStaticTopicSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ForkJoinPool; + public class MQAdminTestUtils { private static Logger log = Logger.getLogger(MQAdminTestUtils.class); public static boolean createTopic(String nameSrvAddr, String clusterName, String topic, - int queueNum) { + int queueNum) { int defaultWaitTime = 5; return createTopic(nameSrvAddr, clusterName, topic, queueNum, defaultWaitTime); } public static boolean createTopic(String nameSrvAddr, String clusterName, String topic, - int queueNum, int waitTimeSec) { + int queueNum, int waitTimeSec) { boolean createResult = false; DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); mqAdminExt.setInstanceName(UUID.randomUUID().toString()); @@ -108,12 +102,12 @@ public class MQAdminTestUtils { try { mqAdminExt.start(); Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, - clusterName); + clusterName); for (String addr : masterSet) { try { mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config); log.info(String.format("create subscription group %s to %s success.\n", consumerId, - addr)); + addr)); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000 * 1); @@ -159,37 +153,10 @@ public class MQAdminTestUtils { return false; } - public void getSubConnection(String nameSrvAddr, String clusterName, String consumerId) { - boolean createResult = true; - DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); - mqAdminExt.setNamesrvAddr(nameSrvAddr); - SubscriptionGroupConfig config = new SubscriptionGroupConfig(); - config.setGroupName(consumerId); - try { - mqAdminExt.start(); - Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, - clusterName); - for (String addr : masterSet) { - try { - - System.out.printf("create subscription group %s to %s success.\n", consumerId, - addr); - } catch (Exception e) { - e.printStackTrace(); - Thread.sleep(1000 * 1); - } - } - } catch (Exception e) { - createResult = false; - e.printStackTrace(); - } - ForkJoinPool.commonPool().execute(mqAdminExt::shutdown); - } - //should only be test, if some middle operation failed, it dose not backup the brokerConfigMap public static Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception { Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); - assert brokerConfigMap.isEmpty(); + assert brokerConfigMap.isEmpty(); TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap); MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt); MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false); @@ -221,20 +188,20 @@ public class MQAdminTestUtils { MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata); // now do the remapping //Step1: let the new leader can be write without the logicOffset - for (String broker: brokersToMapIn) { + for (String broker : brokersToMapIn) { String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); } //Step2: forbid the write of old leader - for (String broker: brokersToMapOut) { + for (String broker : brokersToMapOut) { String addr = clientMetadata.findMasterBrokerAddr(broker); TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); } //Step5: write the non-target brokers - for (String broker: brokerConfigMap.keySet()) { + for (String broker : brokerConfigMap.keySet()) { if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) { continue; } @@ -244,25 +211,24 @@ public class MQAdminTestUtils { } } - - public static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception { + public static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception { UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand(); Options options = ServerUtil.buildCommandlineOptions(new Options()); String[] args; if (cluster != null) { - args = new String[] { - "-c", cluster, - "-t", topic, - "-qn", String.valueOf(queueNum), - "-n", nameservers + args = new String[]{ + "-c", cluster, + "-t", topic, + "-qn", String.valueOf(queueNum), + "-n", nameservers }; } else { String brokerStr = String.join(",", brokers); - args = new String[] { - "-b", brokerStr, - "-t", topic, - "-qn", String.valueOf(queueNum), - "-n", nameservers + args = new String[]{ + "-b", brokerStr, + "-t", topic, + "-qn", String.valueOf(queueNum), + "-n", nameservers }; } final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser()); @@ -276,23 +242,22 @@ public class MQAdminTestUtils { cmd.execute(commandLine, options, null); } - public static void remappingStaticTopicWithCommand(String topic, Set<String> brokers, String cluster, String nameservers) throws Exception { RemappingStaticTopicSubCommand cmd = new RemappingStaticTopicSubCommand(); Options options = ServerUtil.buildCommandlineOptions(new Options()); String[] args; if (cluster != null) { - args = new String[] { - "-c", cluster, - "-t", topic, - "-n", nameservers + args = new String[]{ + "-c", cluster, + "-t", topic, + "-n", nameservers }; } else { String brokerStr = String.join(",", brokers); - args = new String[] { - "-b", brokerStr, - "-t", topic, - "-n", nameservers + args = new String[]{ + "-b", brokerStr, + "-t", topic, + "-n", nameservers }; } final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 5c80e86..6f551d3 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -45,8 +45,6 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.rpc.ClientMetadata; -import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.topic.TopicValidator; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index d1f0fcd..fce4318 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -64,11 +64,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.rpc.ClientMetadata; -import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; -import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; -import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index c4838e3..35efe96 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -44,8 +44,6 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.rpc.ClientMetadata; -import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.exception.RemotingCommandException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java index 0fda471..a5aab4d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java @@ -1,15 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.tools.admin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; @@ -160,7 +174,7 @@ public class MQAdminUtils { if (topicOffset == null) { throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader); } - //TODO check the max offset, will it return -1? + //TO DO check the max offset, will it return -1? if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java index a3b757a..ba4e54e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -107,7 +107,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { } MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force, defaultMQAdminExt); return; - }catch (Exception e) { + } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { defaultMQAdminExt.shutdown(); @@ -138,7 +138,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { try { defaultMQAdminExt.start(); - if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) { + if (!commandLine.hasOption("b") && !commandLine.hasOption('c')) { ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); return; } @@ -184,14 +184,14 @@ public class RemappingStaticTopicSubCommand implements SubCommand { { TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>()); String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false); - System.out.println("The old mapping data is written to file " + oldMappingDataFile); + System.out.printf("The old mapping data is written to file " + oldMappingDataFile + "\n"); } TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers); { String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true); - System.out.println("The old mapping data is written to file " + newMappingDataFile); + System.out.printf("The old mapping data is written to file " + newMappingDataFile + "\n"); } MQAdminUtils.completeNoTargetBrokers(newWrapper.getBrokerConfigMap(), defaultMQAdminExt); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 410381e..d67b8fb 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -21,10 +21,9 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; @@ -110,7 +109,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { MQAdminUtils.completeNoTargetBrokers(wrapper.getBrokerConfigMap(), defaultMQAdminExt); MQAdminUtils.updateTopicConfigMappingAll(wrapper.getBrokerConfigMap(), defaultMQAdminExt, false); return; - }catch (Exception e) { + } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { defaultMQAdminExt.shutdown(); @@ -185,7 +184,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { { TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>()); String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false); - System.out.println("The old mapping data is written to file " + oldMappingDataFile); + System.out.printf("The old mapping data is written to file " + oldMappingDataFile + "\n"); } //add the existed brokers to target brokers targetBrokers.addAll(brokerConfigMap.keySet()); @@ -195,7 +194,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { { String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true); - System.out.println("The new mapping data is written to file " + newMappingDataFile); + System.out.printf("The new mapping data is written to file " + newMappingDataFile + "\n"); } MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
