This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 775eb64b87 [ISSUE #9288] Support the disablement of producer 
registration and fast channel shutdown (#9293)
775eb64b87 is described below

commit 775eb64b877f6eb20b76e02e78c3ea14119806d9
Author: ymwneu <[email protected]>
AuthorDate: Fri Apr 11 15:45:00 2025 +0800

    [ISSUE #9288] Support the disablement of producer registration and fast 
channel shutdown (#9293)
---
 .../client/ClientChannelAttributeHelper.java       | 77 ++++++++++++++++++++
 .../rocketmq/broker/client/ConsumerManager.java    | 39 ++++++++++
 .../rocketmq/broker/client/ProducerManager.java    | 82 +++++++++++++++++++---
 .../broker/client/ProducerManagerTest.java         | 19 ++++-
 .../rocketmq/client/impl/MQClientManager.java      |  4 ++
 .../client/impl/factory/MQClientInstance.java      |  8 +++
 .../org/apache/rocketmq/common/BrokerConfig.java   | 38 ++++++++++
 7 files changed, 257 insertions(+), 10 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelAttributeHelper.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelAttributeHelper.java
new file mode 100644
index 0000000000..29085398d0
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelAttributeHelper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+import io.netty.channel.Channel;
+import io.netty.util.AttributeKey;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class ClientChannelAttributeHelper {
+    private static final AttributeKey<String> ATTR_CG = 
AttributeKey.valueOf("CHANNEL_CONSUMER_GROUP");
+    private static final AttributeKey<String> ATTR_PG = 
AttributeKey.valueOf("CHANNEL_PRODUCER_GROUP");
+    private static final String SEPARATOR = "|";
+
+    public static void addProducerGroup(Channel channel, String group) {
+        addGroup(channel, group, ATTR_PG);
+    }
+
+    public static void addConsumerGroup(Channel channel, String group) {
+        addGroup(channel, group, ATTR_CG);
+    }
+
+    public static List<String> getProducerGroups(Channel channel) {
+        return getGroups(channel, ATTR_PG);
+    }
+
+    public static List<String> getConsumerGroups(Channel channel) {
+        return getGroups(channel, ATTR_CG);
+    }
+
+    private static void addGroup(Channel channel, String group, 
AttributeKey<String> key) {
+        if (null == channel || !channel.isActive()) {  // no side effect if 
check active status.
+            return;
+        }
+        if (null == group || group.length() == 0 || null == key) {
+            return;
+        }
+        String groups = channel.attr(key).get();
+        if (null == groups) {
+            channel.attr(key).set(group + SEPARATOR);
+        } else {
+            if (groups.contains(SEPARATOR + group + SEPARATOR)) {
+                return;
+            } else {
+                channel.attr(key).compareAndSet(groups, groups + group + 
SEPARATOR);
+            }
+        }
+    }
+
+    private static List<String> getGroups(Channel channel, 
AttributeKey<String> key) {
+        if (null == channel) {
+            return Collections.emptyList();
+        }
+        if (null == key) {
+            return Collections.emptyList();
+        }
+        String groups = channel.attr(key).get();
+        return null == groups ? Collections.<String>emptyList() : 
Arrays.asList(groups.split("\\|"));
+    }
+
+}
\ No newline at end of file
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index b1057e2a8d..c658b128eb 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -48,12 +48,14 @@ public class ConsumerManager {
     protected final BrokerStatsManager brokerStatsManager;
     private final long channelExpiredTimeout;
     private final long subscriptionExpiredTimeout;
+    private final BrokerConfig brokerConfig;
 
     public ConsumerManager(final ConsumerIdsChangeListener 
consumerIdsChangeListener, long expiredTimeout) {
         this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
         this.brokerStatsManager = null;
         this.channelExpiredTimeout = expiredTimeout;
         this.subscriptionExpiredTimeout = expiredTimeout;
+        this.brokerConfig = null;
     }
 
     public ConsumerManager(final ConsumerIdsChangeListener 
consumerIdsChangeListener,
@@ -62,6 +64,7 @@ public class ConsumerManager {
         this.brokerStatsManager = brokerStatsManager;
         this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout();
         this.subscriptionExpiredTimeout = 
brokerConfig.getSubscriptionExpiredTimeout();
+        this.brokerConfig = brokerConfig;
     }
 
     public ClientChannelInfo findChannel(final String group, final String 
clientId) {
@@ -130,12 +133,43 @@ public class ConsumerManager {
 
     public boolean doChannelCloseEvent(final String remoteAddr, final Channel 
channel) {
         boolean removed = false;
+        if (this.brokerConfig != null && 
this.brokerConfig.isEnableFastChannelEventProcess()) {
+            List<String> groups = 
ClientChannelAttributeHelper.getConsumerGroups(channel);
+            if (this.brokerConfig.isPrintChannelGroups() && groups.size() >= 5 
&& groups.size() >= this.brokerConfig.getPrintChannelGroupsMinNum()) {
+                LOGGER.warn("channel close event, too many consumer groups one 
channel, {}, {}, {}", groups.size(), remoteAddr, groups);
+            }
+            for (String group : groups) {
+                if (null == group || group.length() == 0) {
+                    continue;
+                }
+                ConsumerGroupInfo consumerGroupInfo = 
this.consumerTable.get(group);
+                if (null == consumerGroupInfo) {
+                    continue;
+                }
+                ClientChannelInfo clientChannelInfo = 
consumerGroupInfo.doChannelCloseEvent(remoteAddr, channel);
+                if (clientChannelInfo != null) {
+                    removed = true;
+                    
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, 
clientChannelInfo, consumerGroupInfo.getSubscribeTopics());
+                    if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
+                        ConsumerGroupInfo remove = 
this.consumerTable.remove(group);
+                        if (remove != null) {
+                            LOGGER.info("unregister consumer ok, no any 
connection, and remove consumer group, {}",
+                                    group);
+                            
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
+                        }
+                    }
+                    callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, 
group, consumerGroupInfo.getAllChannel());
+                }
+            }
+            return removed;
+        }
         Iterator<Entry<String, ConsumerGroupInfo>> it = 
this.consumerTable.entrySet().iterator();
         while (it.hasNext()) {
             Entry<String, ConsumerGroupInfo> next = it.next();
             ConsumerGroupInfo info = next.getValue();
             ClientChannelInfo clientChannelInfo = 
info.doChannelCloseEvent(remoteAddr, channel);
             if (clientChannelInfo != null) {
+                removed = true;
                 
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, 
next.getKey(), clientChannelInfo, info.getSubscribeTopics());
                 if (info.getChannelInfoTable().isEmpty()) {
                     ConsumerGroupInfo remove = 
this.consumerTable.remove(next.getKey());
@@ -201,6 +235,11 @@ public class ConsumerManager {
                 callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, 
group, consumerGroupInfo.getAllChannel());
             }
         }
+
+        if (this.brokerConfig != null && 
this.brokerConfig.isEnableFastChannelEventProcess() && r1) {
+            
ClientChannelAttributeHelper.addConsumerGroup(clientChannelInfo.getChannel(), 
group);
+        }
+
         if (null != this.brokerStatsManager) {
             this.brokerStatsManager.incConsumerRegisterTime((int) 
(System.currentTimeMillis() - start));
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 2c3acb6ba9..bc8400c19a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -44,15 +45,23 @@ public class ProducerManager {
         new ConcurrentHashMap<>();
     private final ConcurrentMap<String, Channel> clientChannelTable = new 
ConcurrentHashMap<>();
     protected final BrokerStatsManager brokerStatsManager;
+    private final BrokerConfig brokerConfig;
     private final PositiveAtomicCounter positiveAtomicCounter = new 
PositiveAtomicCounter();
     private final List<ProducerChangeListener> producerChangeListenerList = 
new CopyOnWriteArrayList<>();
 
     public ProducerManager() {
         this.brokerStatsManager = null;
+        this.brokerConfig = null;
     }
 
     public ProducerManager(final BrokerStatsManager brokerStatsManager) {
         this.brokerStatsManager = brokerStatsManager;
+        this.brokerConfig = null;
+    }
+
+    public ProducerManager(final BrokerStatsManager brokerStatsManager, final 
BrokerConfig brokerConfig) {
+        this.brokerStatsManager = brokerStatsManager;
+        this.brokerConfig = brokerConfig;
     }
 
     public int groupSize() {
@@ -136,6 +145,39 @@ public class ProducerManager {
     public boolean doChannelCloseEvent(final String remoteAddr, final Channel 
channel) {
         boolean removed = false;
         if (channel != null) {
+            if (this.brokerConfig != null && 
this.brokerConfig.isEnableFastChannelEventProcess()) {
+                List<String> groups = 
ClientChannelAttributeHelper.getProducerGroups(channel);
+                if (this.brokerConfig.isPrintChannelGroups() && groups.size() 
>= 5 && groups.size() >= this.brokerConfig.getPrintChannelGroupsMinNum()) {
+                    log.warn("channel close event, too many producer groups 
one channel, {}, {}, {}", groups.size(), remoteAddr, groups);
+                }
+                for (String group : groups) {
+                    if (null == group || group.length() == 0) {
+                        continue;
+                    }
+                    ConcurrentMap<Channel, ClientChannelInfo> 
clientChannelInfoTable = this.groupChannelTable.get(group);
+                    if (null == clientChannelInfoTable) {
+                        continue;
+                    }
+                    final ClientChannelInfo clientChannelInfo =
+                            clientChannelInfoTable.remove(channel);
+                    if (clientChannelInfo != null) {
+                        
clientChannelTable.remove(clientChannelInfo.getClientId());
+                        removed = true;
+                        log.info(
+                                "NETTY EVENT: remove channel[{}][{}] from 
ProducerManager groupChannelTable, producer group: {}",
+                                clientChannelInfo.toString(), remoteAddr, 
group);
+                        
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, 
clientChannelInfo);
+                        if (clientChannelInfoTable.isEmpty()) {
+                            ConcurrentMap<Channel, ClientChannelInfo> 
oldGroupTable = this.groupChannelTable.remove(group);
+                            if (oldGroupTable != null) {
+                                log.info("unregister a producer group[{}] from 
groupChannelTable", group);
+                                
callProducerChangeListener(ProducerGroupEvent.GROUP_UNREGISTER, group, null);
+                            }
+                        }
+                    }
+                }
+                return removed;  // must return here, degrade to 
scanNotActiveChannel at worst.
+            }
             for (final Map.Entry<String, ConcurrentMap<Channel, 
ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {
                 final String group = entry.getKey();
                 final ConcurrentMap<Channel, ClientChannelInfo> 
clientChannelInfoTable = entry.getValue();
@@ -162,20 +204,37 @@ public class ProducerManager {
     }
 
     public void registerProducer(final String group, final ClientChannelInfo 
clientChannelInfo) {
+
+        long start = System.currentTimeMillis();
         ClientChannelInfo clientChannelInfoFound;
 
         ConcurrentMap<Channel, ClientChannelInfo> channelTable = 
this.groupChannelTable.get(group);
+        // note that we must take care of the exist groups and channels,
+        // only can return when groups or channels not exist.
+        if (this.brokerConfig != null
+                && !this.brokerConfig.isEnableRegisterProducer()
+                && this.brokerConfig.isRejectTransactionMessage()) {
+            boolean needRegister = true;
+            if (null == channelTable) {
+                needRegister = false;
+            } else {
+                clientChannelInfoFound = 
channelTable.get(clientChannelInfo.getChannel());
+                if (null == clientChannelInfoFound) {
+                    needRegister = false;
+                }
+            }
+            if (!needRegister) {
+                if (null != this.brokerStatsManager) {
+                    this.brokerStatsManager.incProducerRegisterTime((int) 
(System.currentTimeMillis() - start));
+                }
+                return;
+            }
+        }
+
         if (null == channelTable) {
             channelTable = new ConcurrentHashMap<>();
-            // Make sure channelTable will NOT be cleaned by 
#scanNotActiveChannel
-            channelTable.put(clientChannelInfo.getChannel(), 
clientChannelInfo);
             ConcurrentMap<Channel, ClientChannelInfo> prev = 
this.groupChannelTable.putIfAbsent(group, channelTable);
-            if (null == prev) {
-                // Add client-id to channel mapping for new producer group
-                clientChannelTable.put(clientChannelInfo.getClientId(), 
clientChannelInfo.getChannel());
-            } else {
-                channelTable = prev;
-            }
+            channelTable = prev != null ? prev : channelTable;
         }
 
         clientChannelInfoFound = 
channelTable.get(clientChannelInfo.getChannel());
@@ -184,12 +243,19 @@ public class ProducerManager {
             channelTable.put(clientChannelInfo.getChannel(), 
clientChannelInfo);
             clientChannelTable.put(clientChannelInfo.getClientId(), 
clientChannelInfo.getChannel());
             log.info("new producer connected, group: {} channel: {}", group, 
clientChannelInfo.toString());
+            if (this.brokerConfig != null && 
this.brokerConfig.isEnableFastChannelEventProcess()) {
+                
ClientChannelAttributeHelper.addProducerGroup(clientChannelInfo.getChannel(), 
group);
+            }
         }
 
         // Refresh existing client-channel-info update-timestamp
         if (clientChannelInfoFound != null) {
             
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
         }
+
+        if (null != this.brokerStatsManager) {
+            this.brokerStatsManager.incProducerRegisterTime((int) 
(System.currentTimeMillis() - start));
+        }
     }
 
     public void unregisterProducer(final String group, final ClientChannelInfo 
clientChannelInfo) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index 3d6091e02f..451b0e044c 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -22,6 +22,8 @@ import java.lang.reflect.Field;
 import java.util.Map;
 
 import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,6 +38,8 @@ import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class ProducerManagerTest {
+
+    private BrokerConfig brokerConfig;
     private ProducerManager producerManager;
     private String group = "FooBar";
     private ClientChannelInfo clientInfo;
@@ -45,7 +49,8 @@ public class ProducerManagerTest {
 
     @Before
     public void init() {
-        producerManager = new ProducerManager();
+        brokerConfig = new BrokerConfig();
+        producerManager = new ProducerManager(null, brokerConfig);
         clientInfo = new ClientChannelInfo(channel, "clientId", 
LanguageCode.JAVA, 0);
     }
 
@@ -140,10 +145,20 @@ public class ProducerManagerTest {
     }
 
     @Test
-    public void testRegisterProducer() throws Exception {
+    public void testRegisterProducer() {
+        brokerConfig.setEnableRegisterProducer(false);
+        brokerConfig.setRejectTransactionMessage(true);
         producerManager.registerProducer(group, clientInfo);
         Map<Channel, ClientChannelInfo> channelMap = 
producerManager.getGroupChannelTable().get(group);
         Channel channel1 = producerManager.findChannel("clientId");
+        assertThat(channelMap).isNull();
+        assertThat(channel1).isNull();
+
+        brokerConfig.setEnableRegisterProducer(true);
+        brokerConfig.setRejectTransactionMessage(false);
+        producerManager.registerProducer(group, clientInfo);
+        channelMap = producerManager.getGroupChannelTable().get(group);
+        channel1 = producerManager.findChannel("clientId");
         assertThat(channelMap).isNotNull();
         assertThat(channel1).isNotNull();
         assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 02eaa66e99..ca6f461745 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -85,4 +85,8 @@ public class MQClientManager {
     public void removeClientFactory(final String clientId) {
         this.factoryTable.remove(clientId);
     }
+
+    public ConcurrentMap<String, MQClientInstance> getFactoryTable() {
+        return factoryTable;
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index d2a4694bb0..3055f2cdee 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1395,6 +1395,14 @@ public class MQClientInstance {
         return clientConfig;
     }
 
+    public ConcurrentMap<String, MQProducerInner> getProducerTable() {
+        return producerTable;
+    }
+
+    public ConcurrentMap<String, MQConsumerInner> getConsumerTable() {
+        return consumerTable;
+    }
+
     public TopicRouteData queryTopicRouteData(String topic) {
         TopicRouteData data = this.getAnExistTopicRouteData(topic);
         if (data == null) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 44f5e1eff0..a411ad496b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -428,6 +428,10 @@ public class BrokerConfig extends BrokerIdentity {
     private long popInflightMessageThreshold = 10000;
     private boolean enablePopMessageThreshold = false;
 
+    private boolean enableFastChannelEventProcess = false;
+    private boolean printChannelGroups = false;
+    private int printChannelGroupsMinNum = 5;
+
     private int splitRegistrationSize = 800;
 
     /**
@@ -457,6 +461,8 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean recallMessageEnable = false;
 
+    private boolean enableRegisterProducer = true;
+
     private boolean enableCreateSysGroup = true;
 
     public String getConfigBlackList() {
@@ -1915,6 +1921,30 @@ public class BrokerConfig extends BrokerIdentity {
         this.enableSplitRegistration = enableSplitRegistration;
     }
 
+    public boolean isEnableFastChannelEventProcess() {
+        return enableFastChannelEventProcess;
+    }
+
+    public void setEnableFastChannelEventProcess(boolean 
enableFastChannelEventProcess) {
+        this.enableFastChannelEventProcess = enableFastChannelEventProcess;
+    }
+
+    public boolean isPrintChannelGroups() {
+        return printChannelGroups;
+    }
+
+    public void setPrintChannelGroups(boolean printChannelGroups) {
+        this.printChannelGroups = printChannelGroups;
+    }
+
+    public int getPrintChannelGroupsMinNum() {
+        return printChannelGroupsMinNum;
+    }
+
+    public void setPrintChannelGroupsMinNum(int printChannelGroupsMinNum) {
+        this.printChannelGroupsMinNum = printChannelGroupsMinNum;
+    }
+
     public int getSplitRegistrationSize() {
         return splitRegistrationSize;
     }
@@ -2019,6 +2049,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.recallMessageEnable = recallMessageEnable;
     }
 
+    public boolean isEnableRegisterProducer() {
+        return enableRegisterProducer;
+    }
+
+    public void setEnableRegisterProducer(boolean enableRegisterProducer) {
+        this.enableRegisterProducer = enableRegisterProducer;
+    }
+
     public boolean isEnableCreateSysGroup() {
         return enableCreateSysGroup;
     }

Reply via email to