This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b7f0162ff fix thread-safety problem of admin tools (#4843)
b7f0162ff is described below
commit b7f0162ffade63de1beec8e9271ae013401a666b
Author: HuiTong <[email protected]>
AuthorDate: Fri Aug 19 12:26:47 2022 +0800
fix thread-safety problem of admin tools (#4843)
---
.../java/org/apache/rocketmq/common/admin/ConsumeStats.java | 10 ++++++----
.../java/org/apache/rocketmq/common/admin/TopicStatsTable.java | 10 ++++++----
.../org/apache/rocketmq/common/protocol/body/TopicList.java | 5 +++--
.../org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java | 4 ++--
4 files changed, 17 insertions(+), 12 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
index 6b1c49290..ae7e18dd2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
@@ -16,14 +16,16 @@
*/
package org.apache.rocketmq.common.admin;
-import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class ConsumeStats extends RemotingSerializable {
- private HashMap<MessageQueue, OffsetWrapper> offsetTable = new
HashMap<MessageQueue, OffsetWrapper>();
+ private Map<MessageQueue, OffsetWrapper> offsetTable = new
ConcurrentHashMap<MessageQueue, OffsetWrapper>();
private double consumeTps = 0;
public long computeTotalDiff() {
@@ -39,11 +41,11 @@ public class ConsumeStats extends RemotingSerializable {
return diffTotal;
}
- public HashMap<MessageQueue, OffsetWrapper> getOffsetTable() {
+ public Map<MessageQueue, OffsetWrapper> getOffsetTable() {
return offsetTable;
}
- public void setOffsetTable(HashMap<MessageQueue, OffsetWrapper>
offsetTable) {
+ public void setOffsetTable(Map<MessageQueue, OffsetWrapper> offsetTable) {
this.offsetTable = offsetTable;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
index 729075c06..42a8872dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
@@ -16,18 +16,20 @@
*/
package org.apache.rocketmq.common.admin;
-import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicStatsTable extends RemotingSerializable {
- private HashMap<MessageQueue, TopicOffset> offsetTable = new
HashMap<MessageQueue, TopicOffset>();
+ private Map<MessageQueue, TopicOffset> offsetTable = new
ConcurrentHashMap<MessageQueue, TopicOffset>();
- public HashMap<MessageQueue, TopicOffset> getOffsetTable() {
+ public Map<MessageQueue, TopicOffset> getOffsetTable() {
return offsetTable;
}
- public void setOffsetTable(HashMap<MessageQueue, TopicOffset> offsetTable)
{
+ public void setOffsetTable(Map<MessageQueue, TopicOffset> offsetTable) {
this.offsetTable = offsetTable;
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
index baf831247..9b9144e2c 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
@@ -16,12 +16,13 @@
*/
package org.apache.rocketmq.common.protocol.body;
-import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicList extends RemotingSerializable {
- private Set<String> topicList = new HashSet<String>();
+ private Set<String> topicList = new CopyOnWriteArraySet<>();
private String brokerAddr;
public Set<String> getTopicList() {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 3cea455a2..bb08e0119 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -797,7 +797,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
}
if (!hasConsumed) {
- HashMap<MessageQueue, TopicOffset> topicStatus =
this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic,
timeoutMillis).getOffsetTable();
+ Map<MessageQueue, TopicOffset> topicStatus =
this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic,
timeoutMillis).getOffsetTable();
for (int i = 0; i < queueData.getReadQueueNums(); i++) {
MessageQueue queue = new MessageQueue(topic,
queueData.getBrokerName(), i);
OffsetWrapper offsetWrapper = new OffsetWrapper();
@@ -1107,7 +1107,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
return adminToolExecute(new AdminToolHandler() {
@Override
public AdminToolResult doExecute() throws Exception {
- final List<QueueTimeSpan> spanSet = new
ArrayList<QueueTimeSpan>();
+ final List<QueueTimeSpan> spanSet = new
CopyOnWriteArrayList<>();
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
if (topicRouteData == null || topicRouteData.getBrokerDatas()
== null || topicRouteData.getBrokerDatas().size() == 0) {