This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 311d76f [ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in
StatsItem.java to improve performance (#3351)
311d76f is described below
commit 311d76f9c851107386b08a131909335fffa2a631
Author: huangli <[email protected]>
AuthorDate: Tue Sep 14 23:48:15 2021 +0800
[ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in StatsItem.java
to improve performance (#3351)
---
.../ConsumeMessageConcurrentlyServiceTest.java | 2 +-
.../apache/rocketmq/common/stats/StatsItem.java | 22 ++++++++++++----------
.../apache/rocketmq/common/stats/StatsItemSet.java | 8 ++++----
.../rocketmq/common/stats/StatsItemSetTest.java | 4 +++-
.../rocketmq/store/stats/BrokerStatsManager.java | 6 +++---
.../store/schedule/ScheduleMessageServiceTest.java | 6 +++---
6 files changed, 26 insertions(+), 22 deletions(-)
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index e8feb80..6fa76e0 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -178,7 +178,7 @@ public class ConsumeMessageConcurrentlyServiceTest {
StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr);
StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" +
pushConsumer.getDefaultMQPushConsumerImpl().groupName());
- assertThat(item.getValue().get()).isGreaterThan(0L);
+ assertThat(item.getValue().sum()).isGreaterThan(0L);
MessageExt msg = messageAtomic.get();
assertThat(msg).isNotNull();
assertThat(msg.getTopic()).isEqualTo(topic);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
index b078551..6007cb0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
@@ -21,14 +21,16 @@ import java.util.LinkedList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
public class StatsItem {
- private final AtomicLong value = new AtomicLong(0);
+ private final LongAdder value = new LongAdder();
- private final AtomicLong times = new AtomicLong(0);
+ private final LongAdder times = new LongAdder();
private final LinkedList<CallSnapshot> csListMinute = new
LinkedList<CallSnapshot>();
@@ -157,8 +159,8 @@ public class StatsItem {
if (this.csListMinute.size() == 0) {
this.csListMinute.add(new
CallSnapshot(System.currentTimeMillis() - 10 * 1000, 0, 0));
}
- this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(),
this.times.get(), this.value
- .get()));
+ this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(),
this.times.sum(), this.value
+ .sum()));
if (this.csListMinute.size() > 7) {
this.csListMinute.removeFirst();
}
@@ -170,8 +172,8 @@ public class StatsItem {
if (this.csListHour.size() == 0) {
this.csListHour.add(new
CallSnapshot(System.currentTimeMillis() - 10 * 60 * 1000, 0, 0));
}
- this.csListHour.add(new CallSnapshot(System.currentTimeMillis(),
this.times.get(), this.value
- .get()));
+ this.csListHour.add(new CallSnapshot(System.currentTimeMillis(),
this.times.sum(), this.value
+ .sum()));
if (this.csListHour.size() > 7) {
this.csListHour.removeFirst();
}
@@ -183,8 +185,8 @@ public class StatsItem {
if (this.csListDay.size() == 0) {
this.csListDay.add(new CallSnapshot(System.currentTimeMillis()
- 1 * 60 * 60 * 1000, 0, 0));
}
- this.csListDay.add(new CallSnapshot(System.currentTimeMillis(),
this.times.get(), this.value
- .get()));
+ this.csListDay.add(new CallSnapshot(System.currentTimeMillis(),
this.times.sum(), this.value
+ .sum()));
if (this.csListDay.size() > 25) {
this.csListDay.removeFirst();
}
@@ -214,7 +216,7 @@ public class StatsItem {
ss.getAvgpt());
}
- public AtomicLong getValue() {
+ public LongAdder getValue() {
return value;
}
@@ -226,7 +228,7 @@ public class StatsItem {
return statsName;
}
- public AtomicLong getTimes() {
+ public LongAdder getTimes() {
return times;
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
index a28d008..8d5418e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -154,14 +154,14 @@ public class StatsItemSet {
public void addValue(final String statsKey, final int incValue, final int
incTimes) {
StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
- statsItem.getValue().addAndGet(incValue);
- statsItem.getTimes().addAndGet(incTimes);
+ statsItem.getValue().add(incValue);
+ statsItem.getTimes().add(incTimes);
}
public void addRTValue(final String statsKey, final int incValue, final
int incTimes) {
StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
- statsItem.getValue().addAndGet(incValue);
- statsItem.getTimes().addAndGet(incTimes);
+ statsItem.getValue().add(incValue);
+ statsItem.getTimes().add(incTimes);
}
public void delValue(final String statsKey) {
diff --git
a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
index 5b4c5d8..d834160 100644
---
a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
+++
b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.junit.After;
import org.junit.Test;
@@ -95,7 +97,7 @@ public class StatsItemSetTest {
}
}
- private AtomicLong test_unit() throws InterruptedException {
+ private LongAdder test_unit() throws InterruptedException {
final StatsItemSet statsItemSet = new StatsItemSet("topicTest",
scheduler, null);
executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new
ThreadFactoryImpl("testMultiThread"));
diff --git
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 3e643e3..b9e11fd 100644
---
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -213,15 +213,15 @@ public class BrokerStatsManager {
}
public void incBrokerPutNums() {
-
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
+
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
}
public void incBrokerPutNums(final int incValue) {
-
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
+
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
}
public void incBrokerGetNums(final int incValue) {
-
this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
+
this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
}
public void incSendBackNums(final String group, final String topic) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
index fa3c6bf..d375fb0 100644
---
a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
@@ -149,9 +149,9 @@ public class ScheduleMessageServiceTest {
assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
// get the stats change
-
assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS,
brokerConfig.getBrokerClusterName()).getValue().get()).isEqualTo(1);
-
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS,
topic).getValue().get()).isEqualTo(1L);
-
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE,
topic).getValue().get()).isEqualTo(messageResult.getBufferTotalSize());
+
assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS,
brokerConfig.getBrokerClusterName()).getValue().sum()).isEqualTo(1);
+
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS,
topic).getValue().sum()).isEqualTo(1L);
+
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE,
topic).getValue().sum()).isEqualTo(messageResult.getBufferTotalSize());
// get the message body
ByteBuffer byteBuffer =
ByteBuffer.allocate(messageResult.getBufferTotalSize());