This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 2841c9d fix statItem concurrency issues (#9166)
2841c9d is described below
commit 2841c9d6d17252a426f07daeff5a468946f897c2
Author: huazhongming <[email protected]>
AuthorDate: Sat Oct 30 10:28:50 2021 +0800
fix statItem concurrency issues (#9166)
---
.../org/apache/dubbo/rpc/filter/tps/StatItem.java | 33 ++++++------
.../rpc/filter/tps/DefaultTPSLimiterTest.java | 14 ++---
.../apache/dubbo/rpc/filter/tps/StatItemTest.java | 60 +++++++++++++++++++++-
3 files changed, 80 insertions(+), 27 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
index f3d25a8..144ce49 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/StatItem.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.filter.tps;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Judge whether a particular invocation of service provider method should be
allowed within a configured time interval.
@@ -24,35 +25,32 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
class StatItem {
- private String name;
+ private final String name;
- private long lastResetTime;
+ private final AtomicLong lastResetTime;
- private long interval;
+ private final long interval;
- private AtomicInteger token;
+ private final AtomicInteger token;
- private int rate;
+ private final int rate;
StatItem(String name, int rate, long interval) {
this.name = name;
this.rate = rate;
this.interval = interval;
- this.lastResetTime = System.currentTimeMillis();
+ this.lastResetTime = new AtomicLong(System.currentTimeMillis());
this.token = new AtomicInteger(rate);
}
public boolean isAllowable() {
long now = System.currentTimeMillis();
- if (now > lastResetTime + interval) {
+ if (now > lastResetTime.get() + interval) {
token.set(rate);
- lastResetTime = now;
+ lastResetTime.set(now);
}
- if (token.decrementAndGet() < 0) {
- return false;
- }
- return true;
+ return token.decrementAndGet() >= 0;
}
public long getInterval() {
@@ -66,7 +64,7 @@ class StatItem {
long getLastResetTime() {
- return lastResetTime;
+ return lastResetTime.get();
}
int getToken() {
@@ -75,11 +73,10 @@ class StatItem {
@Override
public String toString() {
- return new StringBuilder(32).append("StatItem ")
- .append("[name=").append(name).append(", ")
- .append("rate = ").append(rate).append(", ")
- .append("interval = ").append(interval).append(']')
- .toString();
+ return "StatItem " +
+ "[name=" + name + ", " +
+ "rate = " + rate + ", " +
+ "interval = " + interval + ']';
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
index 1e80af5..31650f6 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java
@@ -120,15 +120,15 @@ public class DefaultTPSLimiterTest {
startLatch.countDown();
stopLatch.await();
- Assertions.assertEquals(taskList.stream().map(task ->
task.getCount()).reduce((a, b) -> a + b).get(), 100);
+
Assertions.assertEquals(taskList.stream().map(Task::getCount).reduce(Integer::sum).get(),
100);
}
- class Task implements Runnable {
- private DefaultTPSLimiter defaultTPSLimiter;
- private URL url;
- private Invocation invocation;
- private CountDownLatch startLatch;
- private CountDownLatch stopLatch;
+ static class Task implements Runnable {
+ private final DefaultTPSLimiter defaultTPSLimiter;
+ private final URL url;
+ private final Invocation invocation;
+ private final CountDownLatch startLatch;
+ private final CountDownLatch stopLatch;
private int count;
public Task(DefaultTPSLimiter defaultTPSLimiter, URL url, Invocation
invocation, CountDownLatch startLatch, CountDownLatch stopLatch) {
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/StatItemTest.java
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/StatItemTest.java
index 1ebe753..dfeda37 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/StatItemTest.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/StatItemTest.java
@@ -17,10 +17,16 @@
package org.apache.dubbo.rpc.filter.tps;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
public class StatItemTest {
@@ -47,10 +53,60 @@ public class StatItemTest {
final int EXPECTED_RATE = 5;
statItem = new StatItem("test", EXPECTED_RATE, 60_000L);
for (int i = 1; i <= EXPECTED_RATE; i++) {
- assertEquals(true, statItem.isAllowable());
+ assertTrue(statItem.isAllowable());
}
// Must block the 6th item
- assertEquals(false, statItem.isAllowable());
+ assertFalse(statItem.isAllowable());
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ statItem = new StatItem("test", 100, 100000);
+
+ List<Task> taskList = new ArrayList<>();
+ int threadNum = 50;
+ CountDownLatch stopLatch = new CountDownLatch(threadNum);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ for (int i = 0; i < threadNum; i++) {
+ taskList.add(new Task(statItem, startLatch, stopLatch));
+
+ }
+ startLatch.countDown();
+ stopLatch.await();
+
+
Assertions.assertEquals(taskList.stream().map(Task::getCount).reduce(Integer::sum).get(),
100);
+ }
+
+
+ static class Task implements Runnable {
+ private final StatItem statItem;
+ private final CountDownLatch startLatch;
+ private final CountDownLatch stopLatch;
+ private int count;
+
+ public Task(StatItem statItem, CountDownLatch startLatch,
CountDownLatch stopLatch) {
+ this.statItem = statItem;
+ this.startLatch = startLatch;
+ this.stopLatch = stopLatch;
+ new Thread(this).start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ startLatch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (int j = 0; j < 10000; j++) {
+ count = statItem.isAllowable() ? count + 1 : count;
+ }
+ stopLatch.countDown();
+ }
+
+ public int getCount() {
+ return count;
+ }
}
}