This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9dec4cf5fe [ISSUE #7669] map variable delayLevelTable changed to
ConcurrentSkipListMap from ConcurrentHashMap (#7675)
9dec4cf5fe is described below
commit 9dec4cf5fea916cd64fc47f6a3f036e5017b6622
Author: YASH PATEL <[email protected]>
AuthorDate: Tue Dec 19 17:17:13 2023 +0530
[ISSUE #7669] map variable delayLevelTable changed to ConcurrentSkipListMap
from ConcurrentHashMap (#7675)
---
.../apache/rocketmq/broker/schedule/ScheduleMessageService.java | 5 +++--
.../main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java | 8 ++++----
.../main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 5 +++--
3 files changed, 10 insertions(+), 8 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 0c2e6507bd..ef7e4f6789 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -23,6 +23,7 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -70,8 +71,8 @@ public class ScheduleMessageService extends ConfigManager {
private static final long WAIT_FOR_SHUTDOWN = 5000L;
private static final long DELAY_FOR_A_SLEEP = 10L;
- private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis
*/> delayLevelTable =
- new ConcurrentHashMap<>(32);
+ private final ConcurrentSkipListMap<Integer /* level */, Long/* delay
timeMillis */> delayLevelTable =
+ new ConcurrentSkipListMap<>();
private final ConcurrentMap<Integer /* level */, Long/* offset */>
offsetTable =
new ConcurrentHashMap<>(32);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index c0d00d8640..e907a1ccc3 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -24,7 +24,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -199,7 +199,7 @@ public class ProxyConfig implements ConfigFile {
private boolean useDelayLevel = false;
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m
9m 10m 20m 30m 1h 2h";
- private transient Map<Integer /* level */, Long/* delay timeMillis */>
delayLevelTable = new ConcurrentHashMap<>();
+ private transient ConcurrentSkipListMap<Integer /* level */, Long/* delay
timeMillis */> delayLevelTable = new ConcurrentSkipListMap<>();
private String metricCollectorMode =
MetricCollectorMode.OFF.getModeString();
// Example address: 127.0.0.1:1234
@@ -291,7 +291,7 @@ public class ProxyConfig implements ConfigFile {
}
public void parseDelayLevel() {
- this.delayLevelTable = new ConcurrentHashMap<>();
+ this.delayLevelTable = new ConcurrentSkipListMap<>();
Map<String, Long> timeUnitTable = new HashMap<>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
@@ -1124,7 +1124,7 @@ public class ProxyConfig implements ConfigFile {
this.messageDelayLevel = messageDelayLevel;
}
- public Map<Integer, Long> getDelayLevelTable() {
+ public ConcurrentSkipListMap<Integer, Long> getDelayLevelTable() {
return delayLevelTable;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index dc5f312e5a..aa72b1617d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -46,6 +46,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -190,8 +191,8 @@ public class DefaultMessageStore implements MessageStore {
private SendMessageBackHook sendMessageBackHook;
- private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis
*/> delayLevelTable =
- new ConcurrentHashMap<>(32);
+ private final ConcurrentSkipListMap<Integer /* level */, Long/* delay
timeMillis */> delayLevelTable =
+ new ConcurrentSkipListMap<>();
private int maxDelayLevel;