This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 2841c9d  fix statItem concurrency issues (#9166)
2841c9d is described below

commit 2841c9d6d17252a426f07daeff5a468946f897c2
Author: huazhongming <[email protected]>
AuthorDate: Sat Oct 30 10:28:50 2021 +0800

    fix statItem concurrency issues (#9166)
---
 .../org/apache/dubbo/rpc/filter/tps/StatItem.java  | 33 ++++++------
 .../rpc/filter/tps/DefaultTPSLimiterTest.java      | 14 ++---
 .../apache/dubbo/rpc/filter/tps/StatItemTest.java  | 60 +++++++++++++++++++++-
 3 files changed, 80 insertions(+), 27 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
index f3d25a8..144ce49 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.filter.tps;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Judge whether a particular invocation of service provider method should be 
allowed within a configured time interval.
@@ -24,35 +25,32 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 class StatItem {
 
-    private String name;
+    private final String name;
 
-    private long lastResetTime;
+    private final AtomicLong lastResetTime;
 
-    private long interval;
+    private final long interval;
 
-    private AtomicInteger token;
+    private final AtomicInteger token;
 
-    private int rate;
+    private final int rate;
 
     StatItem(String name, int rate, long interval) {
         this.name = name;
         this.rate = rate;
         this.interval = interval;
-        this.lastResetTime = System.currentTimeMillis();
+        this.lastResetTime = new AtomicLong(System.currentTimeMillis());
         this.token = new AtomicInteger(rate);
     }
 
     public boolean isAllowable() {
         long now = System.currentTimeMillis();
-        if (now > lastResetTime + interval) {
+        if (now > lastResetTime.get() + interval) {
             token.set(rate);
-            lastResetTime = now;
+            lastResetTime.set(now);
         }
 
-        if (token.decrementAndGet() < 0) {
-            return false;
-        }
-        return true;
+        return token.decrementAndGet() >= 0;
     }
 
     public long getInterval() {
@@ -66,7 +64,7 @@ class StatItem {
 
 
     long getLastResetTime() {
-        return lastResetTime;
+        return lastResetTime.get();
     }
 
     int getToken() {
@@ -75,11 +73,10 @@ class StatItem {
 
     @Override
     public String toString() {
-        return new StringBuilder(32).append("StatItem ")
-                .append("[name=").append(name).append(", ")
-                .append("rate = ").append(rate).append(", ")
-                .append("interval = ").append(interval).append(']')
-                .toString();
+        return "StatItem " +
+            "[name=" + name + ", " +
+            "rate = " + rate + ", " +
+            "interval = " + interval + ']';
     }
 
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
index 1e80af5..31650f6 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
@@ -120,15 +120,15 @@ public class DefaultTPSLimiterTest {
         startLatch.countDown();
         stopLatch.await();
 
-        Assertions.assertEquals(taskList.stream().map(task -> 
task.getCount()).reduce((a, b) -> a + b).get(), 100);
+        
Assertions.assertEquals(taskList.stream().map(Task::getCount).reduce(Integer::sum).get(),
 100);
     }
 
-    class Task implements Runnable {
-        private DefaultTPSLimiter defaultTPSLimiter;
-        private URL url;
-        private Invocation invocation;
-        private CountDownLatch startLatch;
-        private CountDownLatch stopLatch;
+    static class Task implements Runnable {
+        private final DefaultTPSLimiter defaultTPSLimiter;
+        private final URL url;
+        private final Invocation invocation;
+        private final CountDownLatch startLatch;
+        private final CountDownLatch stopLatch;
         private int count;
 
         public Task(DefaultTPSLimiter defaultTPSLimiter, URL url, Invocation 
invocation, CountDownLatch startLatch, CountDownLatch stopLatch) {
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/StatItemTest.java
 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/StatItemTest.java
index 1ebe753..dfeda37 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/StatItemTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/StatItemTest.java
@@ -17,10 +17,16 @@
 package org.apache.dubbo.rpc.filter.tps;
 
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 public class StatItemTest {
 
@@ -47,10 +53,60 @@ public class StatItemTest {
         final int EXPECTED_RATE = 5;
         statItem = new StatItem("test", EXPECTED_RATE, 60_000L);
         for (int i = 1; i <= EXPECTED_RATE; i++) {
-            assertEquals(true, statItem.isAllowable());
+            assertTrue(statItem.isAllowable());
         }
 
         // Must block the 6th item
-        assertEquals(false, statItem.isAllowable());
+        assertFalse(statItem.isAllowable());
+    }
+
+    @Test
+    public void testConcurrency() throws Exception {
+        statItem = new StatItem("test", 100, 100000);
+
+        List<Task> taskList = new ArrayList<>();
+        int threadNum = 50;
+        CountDownLatch stopLatch = new CountDownLatch(threadNum);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        for (int i = 0; i < threadNum; i++) {
+            taskList.add(new Task(statItem, startLatch, stopLatch));
+
+        }
+        startLatch.countDown();
+        stopLatch.await();
+
+        
Assertions.assertEquals(taskList.stream().map(Task::getCount).reduce(Integer::sum).get(),
 100);
+    }
+
+
+    static class Task implements Runnable {
+        private final StatItem statItem;
+        private final CountDownLatch startLatch;
+        private final CountDownLatch stopLatch;
+        private int count;
+
+        public Task(StatItem statItem, CountDownLatch startLatch, 
CountDownLatch stopLatch) {
+            this.statItem = statItem;
+            this.startLatch = startLatch;
+            this.stopLatch = stopLatch;
+            new Thread(this).start();
+        }
+
+        @Override
+        public void run() {
+            try {
+                startLatch.await();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            for (int j = 0; j < 10000; j++) {
+                count = statItem.isAllowable() ? count + 1 : count;
+            }
+            stopLatch.countDown();
+        }
+
+        public int getCount() {
+            return count;
+        }
     }
 }

Reply via email to