This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 8c9a9d36b844390cccc2ab083ed63d5b222e256c Author: zhouxiang <[email protected]> AuthorDate: Tue Dec 6 16:50:59 2022 +0800 [ISSUE #5485] Remove ConsumerManagerInterface and ProducerManagerInterface --- .../rocketmq/broker/client/ConsumerManager.java | 13 +---- .../broker/client/ConsumerManagerInterface.java | 60 ---------------------- .../rocketmq/broker/client/ProducerManager.java | 12 +---- .../broker/client/ProducerManagerInterface.java | 44 ---------------- .../rocketmq/proxy/service/ServiceManager.java | 8 +-- .../service/client/ClusterConsumerManager.java | 9 ++-- .../proxy/service/sysmessage/HeartbeatSyncer.java | 12 ++--- .../service/sysmessage/HeartbeatSyncerTest.java | 4 +- 8 files changed, 18 insertions(+), 144 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index a70e8579e..787dcdbd2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -38,7 +38,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.store.stats.BrokerStatsManager; -public class ConsumerManager implements ConsumerManagerInterface { +public class ConsumerManager { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<>(1024); @@ -64,7 +64,6 @@ public class ConsumerManager implements ConsumerManagerInterface { this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout(); } - @Override public ClientChannelInfo findChannel(final String group, final String clientId) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (consumerGroupInfo != null) { @@ -73,7 +72,6 @@ public class ConsumerManager implements ConsumerManagerInterface { return null; } - @Override public ClientChannelInfo findChannel(final String group, final Channel channel) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (consumerGroupInfo != null) { @@ -82,7 +80,6 @@ public class ConsumerManager implements ConsumerManagerInterface { return null; } - @Override public SubscriptionData findSubscriptionData(final String group, final String topic) { return findSubscriptionData(group, topic, true); } @@ -110,7 +107,6 @@ public class ConsumerManager implements ConsumerManagerInterface { return this.consumerTable; } - @Override public ConsumerGroupInfo getConsumerGroupInfo(final String group) { return getConsumerGroupInfo(group, false); } @@ -123,7 +119,6 @@ public class ConsumerManager implements ConsumerManagerInterface { return consumerGroupInfo; } - @Override public int findSubscriptionDataCount(final String group) { ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group); if (consumerGroupInfo != null) { @@ -133,7 +128,6 @@ public class ConsumerManager implements ConsumerManagerInterface { return 0; } - @Override public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { boolean removed = false; Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); @@ -178,7 +172,6 @@ public class ConsumerManager implements ConsumerManagerInterface { isNotifyConsumerIdsChangedEnable, true); } - @Override public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) { @@ -214,7 +207,6 @@ public class ConsumerManager implements ConsumerManagerInterface { return r1 || r2; } - @Override public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); @@ -260,7 +252,6 @@ public class ConsumerManager implements ConsumerManagerInterface { } } - @Override public void scanNotActiveChannel() { Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { @@ -295,7 +286,6 @@ public class ConsumerManager implements ConsumerManagerInterface { removeExpireConsumerGroupInfo(); } - @Override public HashSet<String> queryTopicConsumeByWho(final String topic) { HashSet<String> groups = new HashSet<>(); Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); @@ -310,7 +300,6 @@ public class ConsumerManager implements ConsumerManagerInterface { return groups; } - @Override public void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener) { consumerIdsChangeListenerList.add(listener); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java deleted file mode 100644 index 6998f60e7..000000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker.client; - -import io.netty.channel.Channel; -import java.util.Set; -import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; - -public interface ConsumerManagerInterface { - - ClientChannelInfo findChannel(String group, String clientId); - - ClientChannelInfo findChannel(String group, Channel channel); - - SubscriptionData findSubscriptionData(String group, String topic); - - ConsumerGroupInfo getConsumerGroupInfo(String group); - - int findSubscriptionDataCount(String group); - - boolean doChannelCloseEvent(String remoteAddr, Channel channel); - - default boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo, - ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, - Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { - return registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, - isNotifyConsumerIdsChangedEnable, true); - } - - boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo, - ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, - Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription); - - void unregisterConsumer(String group, ClientChannelInfo clientChannelInfo, - boolean isNotifyConsumerIdsChangedEnable); - - void scanNotActiveChannel(); - - Set<String> queryTopicConsumeByWho(String topic); - - void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener); -} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index a3ed9c590..52d67bf28 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -35,7 +35,7 @@ import org.apache.rocketmq.remoting.protocol.body.ProducerInfo; import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo; import org.apache.rocketmq.store.stats.BrokerStatsManager; -public class ProducerManager implements ProducerManagerInterface { +public class ProducerManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3; @@ -54,12 +54,10 @@ public class ProducerManager implements ProducerManagerInterface { this.brokerStatsManager = brokerStatsManager; } - @Override public int groupSize() { return this.groupChannelTable.size(); } - @Override public boolean groupOnline(String group) { Map<Channel, ClientChannelInfo> channels = this.groupChannelTable.get(group); return channels != null && !channels.isEmpty(); @@ -69,7 +67,6 @@ public class ProducerManager implements ProducerManagerInterface { return groupChannelTable; } - @Override public ProducerTableInfo getProducerTable() { Map<String, List<ProducerInfo>> map = new HashMap<>(); for (String group : this.groupChannelTable.keySet()) { @@ -97,7 +94,6 @@ public class ProducerManager implements ProducerManagerInterface { return new ProducerTableInfo(map); } - @Override public void scanNotActiveChannel() { Iterator<Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator(); @@ -133,7 +129,6 @@ public class ProducerManager implements ProducerManagerInterface { } } - @Override public synchronized boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { boolean removed = false; if (channel != null) { @@ -165,7 +160,6 @@ public class ProducerManager implements ProducerManagerInterface { return removed; } - @Override public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { ClientChannelInfo clientChannelInfoFound = null; @@ -189,7 +183,6 @@ public class ProducerManager implements ProducerManagerInterface { } } - @Override public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); if (null != channelTable && !channelTable.isEmpty()) { @@ -209,7 +202,6 @@ public class ProducerManager implements ProducerManagerInterface { } } - @Override public Channel getAvailableChannel(String groupId) { if (groupId == null) { return null; @@ -250,7 +242,6 @@ public class ProducerManager implements ProducerManagerInterface { return lastActiveChannel; } - @Override public Channel findChannel(String clientId) { return clientChannelTable.get(clientId); } @@ -266,7 +257,6 @@ public class ProducerManager implements ProducerManagerInterface { } } - @Override public void appendProducerChangeListener(ProducerChangeListener producerChangeListener) { producerChangeListenerList.add(producerChangeListener); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java deleted file mode 100644 index 5e2e7e5b0..000000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker.client; - -import io.netty.channel.Channel; -import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo; - -public interface ProducerManagerInterface { - - int groupSize(); - - boolean groupOnline(String group); - - ProducerTableInfo getProducerTable(); - - void scanNotActiveChannel(); - - boolean doChannelCloseEvent(String remoteAddr, Channel channel); - - void registerProducer(String group, ClientChannelInfo clientChannelInfo); - - void unregisterProducer(String group, ClientChannelInfo clientChannelInfo); - - Channel getAvailableChannel(String groupId); - - Channel findChannel(String clientId); - - void appendProducerChangeListener(ProducerChangeListener producerChangeListener); -} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java index ce84832ca..bfa2ed963 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManager.java @@ -16,8 +16,8 @@ */ package org.apache.rocketmq.proxy.service; -import org.apache.rocketmq.broker.client.ConsumerManagerInterface; -import org.apache.rocketmq.broker.client.ProducerManagerInterface; +import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.proxy.common.StartAndShutdown; import org.apache.rocketmq.proxy.service.admin.AdminService; import org.apache.rocketmq.proxy.service.message.MessageService; @@ -31,9 +31,9 @@ public interface ServiceManager extends StartAndShutdown { TopicRouteService getTopicRouteService(); - ProducerManagerInterface getProducerManager(); + ProducerManager getProducerManager(); - ConsumerManagerInterface getConsumerManager(); + ConsumerManager getConsumerManager(); TransactionService getTransactionService(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java index 3a98b5ee1..94f4c5232 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java @@ -21,18 +21,17 @@ import java.util.Set; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ConsumerManager; -import org.apache.rocketmq.broker.client.ConsumerManagerInterface; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.proxy.common.StartAndShutdown; import org.apache.rocketmq.proxy.service.admin.AdminService; import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; import org.apache.rocketmq.proxy.service.route.TopicRouteService; import org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; -public class ClusterConsumerManager extends ConsumerManager implements ConsumerManagerInterface, StartAndShutdown { +public class ClusterConsumerManager extends ConsumerManager implements StartAndShutdown { protected HeartbeatSyncer heartbeatSyncer; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java index 041cbcee6..9e333902e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java @@ -28,14 +28,11 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupEvent; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; -import org.apache.rocketmq.broker.client.ConsumerManagerInterface; +import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; -import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.proxy.common.channel.ChannelHelper; import org.apache.rocketmq.proxy.config.ConfigurationManager; @@ -44,15 +41,18 @@ import org.apache.rocketmq.proxy.processor.channel.RemoteChannel; import org.apache.rocketmq.proxy.service.admin.AdminService; import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; import org.apache.rocketmq.proxy.service.route.TopicRouteService; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; public class HeartbeatSyncer extends AbstractSystemMessageSyncer { protected ThreadPoolExecutor threadPoolExecutor; - protected ConsumerManagerInterface consumerManager; + protected ConsumerManager consumerManager; protected final Map<String /* channelId as longText */, RemoteChannel> remoteChannelMap = new ConcurrentHashMap<>(); public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService adminService, - ConsumerManagerInterface consumerManager, MQClientAPIFactory mqClientAPIFactory) { + ConsumerManager consumerManager, MQClientAPIFactory mqClientAPIFactory) { super(topicRouteService, adminService, mqClientAPIFactory); this.consumerManager = consumerManager; this.init(); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java index 45e3942d6..95152186d 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java @@ -34,7 +34,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.RandomStringUtils; import org.apache.rocketmq.broker.client.ClientChannelInfo; -import org.apache.rocketmq.broker.client.ConsumerManagerInterface; +import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -94,7 +94,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { @Mock private AdminService adminService; @Mock - private ConsumerManagerInterface consumerManager; + private ConsumerManager consumerManager; @Mock private MQClientAPIFactory mqClientAPIFactory; @Mock
