This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 311d76f  [ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in 
StatsItem.java to improve performance (#3351)
311d76f is described below

commit 311d76f9c851107386b08a131909335fffa2a631
Author: huangli <[email protected]>
AuthorDate: Tue Sep 14 23:48:15 2021 +0800

    [ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in StatsItem.java 
to improve performance (#3351)
---
 .../ConsumeMessageConcurrentlyServiceTest.java     |  2 +-
 .../apache/rocketmq/common/stats/StatsItem.java    | 22 ++++++++++++----------
 .../apache/rocketmq/common/stats/StatsItemSet.java |  8 ++++----
 .../rocketmq/common/stats/StatsItemSetTest.java    |  4 +++-
 .../rocketmq/store/stats/BrokerStatsManager.java   |  6 +++---
 .../store/schedule/ScheduleMessageServiceTest.java |  6 +++---
 6 files changed, 26 insertions(+), 22 deletions(-)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index e8feb80..6fa76e0 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -178,7 +178,7 @@ public class ConsumeMessageConcurrentlyServiceTest {
         StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr);
         StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + 
pushConsumer.getDefaultMQPushConsumerImpl().groupName());
 
-        assertThat(item.getValue().get()).isGreaterThan(0L);
+        assertThat(item.getValue().sum()).isGreaterThan(0L);
         MessageExt msg = messageAtomic.get();
         assertThat(msg).isNotNull();
         assertThat(msg.getTopic()).isEqualTo(topic);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java 
b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
index b078551..6007cb0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
@@ -21,14 +21,16 @@ import java.util.LinkedList;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.logging.InternalLogger;
 
 public class StatsItem {
 
-    private final AtomicLong value = new AtomicLong(0);
+    private final LongAdder value = new LongAdder();
 
-    private final AtomicLong times = new AtomicLong(0);
+    private final LongAdder times = new LongAdder();
 
     private final LinkedList<CallSnapshot> csListMinute = new 
LinkedList<CallSnapshot>();
 
@@ -157,8 +159,8 @@ public class StatsItem {
             if (this.csListMinute.size() == 0) {
                 this.csListMinute.add(new 
CallSnapshot(System.currentTimeMillis() - 10 * 1000, 0, 0));
             }
-            this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), 
this.times.get(), this.value
-                .get()));
+            this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), 
this.times.sum(), this.value
+                .sum()));
             if (this.csListMinute.size() > 7) {
                 this.csListMinute.removeFirst();
             }
@@ -170,8 +172,8 @@ public class StatsItem {
             if (this.csListHour.size() == 0) {
                 this.csListHour.add(new 
CallSnapshot(System.currentTimeMillis() - 10 * 60 * 1000, 0, 0));
             }
-            this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), 
this.times.get(), this.value
-                .get()));
+            this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), 
this.times.sum(), this.value
+                .sum()));
             if (this.csListHour.size() > 7) {
                 this.csListHour.removeFirst();
             }
@@ -183,8 +185,8 @@ public class StatsItem {
             if (this.csListDay.size() == 0) {
                 this.csListDay.add(new CallSnapshot(System.currentTimeMillis() 
- 1 * 60 * 60 * 1000, 0, 0));
             }
-            this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), 
this.times.get(), this.value
-                .get()));
+            this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), 
this.times.sum(), this.value
+                .sum()));
             if (this.csListDay.size() > 25) {
                 this.csListDay.removeFirst();
             }
@@ -214,7 +216,7 @@ public class StatsItem {
                 ss.getAvgpt());
     }
 
-    public AtomicLong getValue() {
+    public LongAdder getValue() {
         return value;
     }
 
@@ -226,7 +228,7 @@ public class StatsItem {
         return statsName;
     }
 
-    public AtomicLong getTimes() {
+    public LongAdder getTimes() {
         return times;
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java 
b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
index a28d008..8d5418e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -154,14 +154,14 @@ public class StatsItemSet {
 
     public void addValue(final String statsKey, final int incValue, final int 
incTimes) {
         StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
-        statsItem.getValue().addAndGet(incValue);
-        statsItem.getTimes().addAndGet(incTimes);
+        statsItem.getValue().add(incValue);
+        statsItem.getTimes().add(incTimes);
     }
 
     public void addRTValue(final String statsKey, final int incValue, final 
int incTimes) {
         StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
-        statsItem.getValue().addAndGet(incValue);
-        statsItem.getTimes().addAndGet(incTimes);
+        statsItem.getValue().add(incValue);
+        statsItem.getTimes().add(incTimes);
     }
 
     public void delValue(final String statsKey) {
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java 
b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
index 5b4c5d8..d834160 100644
--- 
a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
+++ 
b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.junit.After;
 import org.junit.Test;
@@ -95,7 +97,7 @@ public class StatsItemSetTest {
         }
     }
 
-    private AtomicLong test_unit() throws InterruptedException {
+    private LongAdder test_unit() throws InterruptedException {
         final StatsItemSet statsItemSet = new StatsItemSet("topicTest", 
scheduler, null);
         executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
             new ArrayBlockingQueue<Runnable>(100), new 
ThreadFactoryImpl("testMultiThread"));
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 3e643e3..b9e11fd 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -213,15 +213,15 @@ public class BrokerStatsManager {
     }
 
     public void incBrokerPutNums() {
-        
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
+        
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
     }
 
     public void incBrokerPutNums(final int incValue) {
-        
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
+        
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
     }
 
     public void incBrokerGetNums(final int incValue) {
-        
this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
+        
this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
     }
 
     public void incSendBackNums(final String group, final String topic) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
index fa3c6bf..d375fb0 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
@@ -149,9 +149,9 @@ public class ScheduleMessageServiceTest {
         
assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
 
         // get the stats change
-        
assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, 
brokerConfig.getBrokerClusterName()).getValue().get()).isEqualTo(1);
-        
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, 
topic).getValue().get()).isEqualTo(1L);
-        
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, 
topic).getValue().get()).isEqualTo(messageResult.getBufferTotalSize());
+        
assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, 
brokerConfig.getBrokerClusterName()).getValue().sum()).isEqualTo(1);
+        
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, 
topic).getValue().sum()).isEqualTo(1L);
+        
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, 
topic).getValue().sum()).isEqualTo(messageResult.getBufferTotalSize());
 
         // get the message body
         ByteBuffer byteBuffer = 
ByteBuffer.allocate(messageResult.getBufferTotalSize());

Reply via email to