This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4645942  [ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer 
offset
4645942 is described below

commit 46459426801d582b5ef71c8a3e38267ccd6caad9
Author: panzhi <[email protected]>
AuthorDate: Wed Mar 24 11:39:12 2021 +0800

    [ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer offset
---
 .../rocketmq/broker/offset/ConsumerOffsetManager.java    | 16 ++++++++++++++++
 .../rocketmq/broker/processor/AdminBrokerProcessor.java  |  4 ++++
 .../org/apache/rocketmq/client/impl/MQClientAPIImpl.java |  3 ++-
 .../header/DeleteSubscriptionGroupRequestHeader.java     | 10 ++++++++++
 .../apache/rocketmq/tools/admin/DefaultMQAdminExt.java   |  7 +++++++
 .../rocketmq/tools/admin/DefaultMQAdminExtImpl.java      |  9 ++++++++-
 .../java/org/apache/rocketmq/tools/admin/MQAdminExt.java |  3 +++
 .../command/consumer/DeleteSubscriptionGroupCommand.java | 13 +++++++++++--
 8 files changed, 61 insertions(+), 4 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index ebc9dd8..bd05758 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -232,4 +232,20 @@ public class ConsumerOffsetManager extends ConfigManager {
         }
     }
 
+    public void removeOffset(final String group) {
+        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = 
this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            if (topicAtGroup.contains(group)) {
+                String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+                if (arrays.length == 2 && group.equals(arrays[1])) {
+                    it.remove();
+                    log.warn("clean group offset {}", topicAtGroup);
+                }
+            }
+        }
+
+    }
+
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index dcdb701..0a1d214 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -714,6 +714,10 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
 
         
this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
 
+        if (requestHeader.isRemoveOffset()) {
+            
this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName());
+        }
+
         if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) 
{
             
this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
         }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7a4d556..63b2045 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1467,10 +1467,11 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public void deleteSubscriptionGroup(final String addr, final String 
groupName, final long timeoutMillis)
+    public void deleteSubscriptionGroup(final String addr, final String 
groupName, final boolean removeOffset, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         DeleteSubscriptionGroupRequestHeader requestHeader = new 
DeleteSubscriptionGroupRequestHeader();
         requestHeader.setGroupName(groupName);
+        requestHeader.setRemoveOffset(removeOffset);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
index dff9e2f..6591d77 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
@@ -25,6 +25,8 @@ public class DeleteSubscriptionGroupRequestHeader implements 
CommandCustomHeader
     @CFNotNull
     private String groupName;
 
+    private boolean removeOffset;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -36,4 +38,12 @@ public class DeleteSubscriptionGroupRequestHeader implements 
CommandCustomHeader
     public void setGroupName(String groupName) {
         this.groupName = groupName;
     }
+
+    public boolean isRemoveOffset() {
+        return removeOffset;
+    }
+
+    public void setRemoveOffset(boolean removeOffset) {
+        this.removeOffset = removeOffset;
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index e80a813..8b1c228 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -321,6 +321,13 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
     }
 
     @Override
+    public void deleteSubscriptionGroup(String addr,
+        String groupName, boolean removeOffset) throws RemotingException, 
MQBrokerException, InterruptedException,
+        MQClientException {
+        defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName, 
removeOffset);
+    }
+
+    @Override
     public void createAndUpdateKvConfig(String namespace, String key,
         String value) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 22d4005..5c34370 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -424,7 +424,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
     public void deleteSubscriptionGroup(String addr,
         String groupName) throws RemotingException, MQBrokerException, 
InterruptedException,
         MQClientException {
-        
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, 
groupName, timeoutMillis);
+        
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, 
groupName, false, timeoutMillis);
+    }
+
+    @Override
+    public void deleteSubscriptionGroup(String addr,
+        String groupName, boolean removeOffset) throws RemotingException, 
MQBrokerException, InterruptedException,
+        MQClientException {
+        
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, 
groupName, removeOffset, timeoutMillis);
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 17b6225..d5462cb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -152,6 +152,9 @@ public interface MQAdminExt extends MQAdmin {
     void deleteSubscriptionGroup(final String addr, String groupName) throws 
RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
+    void deleteSubscriptionGroup(final String addr, String groupName, boolean 
removeOffset) throws RemotingException, MQBrokerException,
+        InterruptedException, MQClientException;
+
     void createAndUpdateKvConfig(String namespace, String key,
         String value) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
index 96d8195..fb0efeb 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
@@ -54,6 +54,10 @@ public class DeleteSubscriptionGroupCommand implements 
SubCommand {
         opt.setRequired(true);
         options.addOption(opt);
 
+        opt = new Option("r", "removeOffset", true, "remove offset");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -65,11 +69,16 @@ public class DeleteSubscriptionGroupCommand implements 
SubCommand {
             // groupName
             String groupName = commandLine.getOptionValue('g').trim();
 
+            boolean removeOffset = false;
+            if (commandLine.hasOption('r')) {
+                removeOffset = 
Boolean.valueOf(commandLine.getOptionValue("r").trim());
+            }
+
             if (commandLine.hasOption('b')) {
                 String addr = commandLine.getOptionValue('b').trim();
                 adminExt.start();
 
-                adminExt.deleteSubscriptionGroup(addr, groupName);
+                adminExt.deleteSubscriptionGroup(addr, groupName, 
removeOffset);
                 System.out.printf("delete subscription group [%s] from broker 
[%s] success.%n", groupName,
                     addr);
 
@@ -80,7 +89,7 @@ public class DeleteSubscriptionGroupCommand implements 
SubCommand {
 
                 Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
                 for (String master : masterSet) {
-                    adminExt.deleteSubscriptionGroup(master, groupName);
+                    adminExt.deleteSubscriptionGroup(master, groupName, 
removeOffset);
                     System.out.printf(
                         "delete subscription group [%s] from broker [%s] in 
cluster [%s] success.%n",
                         groupName, master, clusterName);

Reply via email to