This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4645942 [ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer
offset
4645942 is described below
commit 46459426801d582b5ef71c8a3e38267ccd6caad9
Author: panzhi <[email protected]>
AuthorDate: Wed Mar 24 11:39:12 2021 +0800
[ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer offset
---
.../rocketmq/broker/offset/ConsumerOffsetManager.java | 16 ++++++++++++++++
.../rocketmq/broker/processor/AdminBrokerProcessor.java | 4 ++++
.../org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 3 ++-
.../header/DeleteSubscriptionGroupRequestHeader.java | 10 ++++++++++
.../apache/rocketmq/tools/admin/DefaultMQAdminExt.java | 7 +++++++
.../rocketmq/tools/admin/DefaultMQAdminExtImpl.java | 9 ++++++++-
.../java/org/apache/rocketmq/tools/admin/MQAdminExt.java | 3 +++
.../command/consumer/DeleteSubscriptionGroupCommand.java | 13 +++++++++++--
8 files changed, 61 insertions(+), 4 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index ebc9dd8..bd05758 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -232,4 +232,20 @@ public class ConsumerOffsetManager extends ConfigManager {
}
}
+ public void removeOffset(final String group) {
+ Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it =
this.offsetTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
+ String topicAtGroup = next.getKey();
+ if (topicAtGroup.contains(group)) {
+ String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+ if (arrays.length == 2 && group.equals(arrays[1])) {
+ it.remove();
+ log.warn("clean group offset {}", topicAtGroup);
+ }
+ }
+ }
+
+ }
+
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index dcdb701..0a1d214 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -714,6 +714,10 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
+ if (requestHeader.isRemoveOffset()) {
+
this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName());
+ }
+
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats())
{
this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7a4d556..63b2045 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1467,10 +1467,11 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public void deleteSubscriptionGroup(final String addr, final String
groupName, final long timeoutMillis)
+ public void deleteSubscriptionGroup(final String addr, final String
groupName, final boolean removeOffset, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
DeleteSubscriptionGroupRequestHeader requestHeader = new
DeleteSubscriptionGroupRequestHeader();
requestHeader.setGroupName(groupName);
+ requestHeader.setRemoveOffset(removeOffset);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP,
requestHeader);
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr),
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
index dff9e2f..6591d77 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
@@ -25,6 +25,8 @@ public class DeleteSubscriptionGroupRequestHeader implements
CommandCustomHeader
@CFNotNull
private String groupName;
+ private boolean removeOffset;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -36,4 +38,12 @@ public class DeleteSubscriptionGroupRequestHeader implements
CommandCustomHeader
public void setGroupName(String groupName) {
this.groupName = groupName;
}
+
+ public boolean isRemoveOffset() {
+ return removeOffset;
+ }
+
+ public void setRemoveOffset(boolean removeOffset) {
+ this.removeOffset = removeOffset;
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index e80a813..8b1c228 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -321,6 +321,13 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
+ public void deleteSubscriptionGroup(String addr,
+ String groupName, boolean removeOffset) throws RemotingException,
MQBrokerException, InterruptedException,
+ MQClientException {
+ defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName,
removeOffset);
+ }
+
+ @Override
public void createAndUpdateKvConfig(String namespace, String key,
String value) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 22d4005..5c34370 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -424,7 +424,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
public void deleteSubscriptionGroup(String addr,
String groupName) throws RemotingException, MQBrokerException,
InterruptedException,
MQClientException {
-
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr,
groupName, timeoutMillis);
+
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr,
groupName, false, timeoutMillis);
+ }
+
+ @Override
+ public void deleteSubscriptionGroup(String addr,
+ String groupName, boolean removeOffset) throws RemotingException,
MQBrokerException, InterruptedException,
+ MQClientException {
+
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr,
groupName, removeOffset, timeoutMillis);
}
@Override
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 17b6225..d5462cb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -152,6 +152,9 @@ public interface MQAdminExt extends MQAdmin {
void deleteSubscriptionGroup(final String addr, String groupName) throws
RemotingException, MQBrokerException,
InterruptedException, MQClientException;
+ void deleteSubscriptionGroup(final String addr, String groupName, boolean
removeOffset) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
void createAndUpdateKvConfig(String namespace, String key,
String value) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
index 96d8195..fb0efeb 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
@@ -54,6 +54,10 @@ public class DeleteSubscriptionGroupCommand implements
SubCommand {
opt.setRequired(true);
options.addOption(opt);
+ opt = new Option("r", "removeOffset", true, "remove offset");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -65,11 +69,16 @@ public class DeleteSubscriptionGroupCommand implements
SubCommand {
// groupName
String groupName = commandLine.getOptionValue('g').trim();
+ boolean removeOffset = false;
+ if (commandLine.hasOption('r')) {
+ removeOffset =
Boolean.valueOf(commandLine.getOptionValue("r").trim());
+ }
+
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
adminExt.start();
- adminExt.deleteSubscriptionGroup(addr, groupName);
+ adminExt.deleteSubscriptionGroup(addr, groupName,
removeOffset);
System.out.printf("delete subscription group [%s] from broker
[%s] success.%n", groupName,
addr);
@@ -80,7 +89,7 @@ public class DeleteSubscriptionGroupCommand implements
SubCommand {
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
for (String master : masterSet) {
- adminExt.deleteSubscriptionGroup(master, groupName);
+ adminExt.deleteSubscriptionGroup(master, groupName,
removeOffset);
System.out.printf(
"delete subscription group [%s] from broker [%s] in
cluster [%s] success.%n",
groupName, master, clusterName);