This is an automated email from the ASF dual-hosted git repository.
iluo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 2fad342 [Dubbo-2328]Fix the concurrency limit of 'ActiveLimitFilter'
to calculate atomicity (#2445)
2fad342 is described below
commit 2fad342ce1bd6afe683b001d063c82fb89237b48
Author: 三笠 <[email protected]>
AuthorDate: Fri Dec 14 10:09:19 2018 +0800
[Dubbo-2328]Fix the concurrency limit of 'ActiveLimitFilter' to calculate
atomicity (#2445)
* [Dubbo-2328]Fix the concurrency limit of 'ActiveLimitFilter' to calculate
atomicity
* format
* test timeout
* count fix
* format
---
.../main/java/org/apache/dubbo/rpc/RpcStatus.java | 26 ++++++
.../apache/dubbo/rpc/filter/ActiveLimitFilter.java | 44 +++++-----
.../dubbo/rpc/filter/ActiveLimitFilterTest.java | 94 +++++++++++++++++++++-
3 files changed, 138 insertions(+), 26 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
index 8b20cc4..1e42873 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
@@ -53,6 +53,12 @@ public class RpcStatus {
private volatile Semaphore executesLimit;
private volatile int executesPermits;
+ /**
+ * Semaphore used to control concurrency limit set by `actives`
+ */
+ private volatile Semaphore activesLimit;
+ private volatile int activesPermits;
+
private RpcStatus() {
}
@@ -334,4 +340,24 @@ public class RpcStatus {
return executesLimit;
}
+
+
+ /**
+ * Get the semaphore for thread number. Semaphore's permits is decided by
{@link Constants#ACTIVES_KEY}
+ *
+ * @param maxThreadNum value of {@link Constants#ACTIVES_KEY}
+ * @return thread number semaphore
+ */
+ public Semaphore getActivesSemaphore(int maxThreadNum) {
+ if (activesLimit == null || activesPermits != maxThreadNum) {
+ synchronized (this) {
+ if (activesLimit == null || activesPermits != maxThreadNum) {
+ activesLimit = new Semaphore(maxThreadNum);
+ activesPermits = maxThreadNum;
+ }
+ }
+ }
+
+ return activesLimit;
+ }
}
\ No newline at end of file
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java
index 7f7aff8..2513b5e 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java
@@ -25,6 +25,8 @@ import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcStatus;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
/**
* LimitInvokerFilter
@@ -37,30 +39,28 @@ public class ActiveLimitFilter implements Filter {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName,
Constants.ACTIVES_KEY, 0);
+ Semaphore activesLimit = null;
+ boolean acquireResult = false;
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(),
invocation.getMethodName());
if (max > 0) {
long timeout =
invoker.getUrl().getMethodParameter(invocation.getMethodName(),
Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
- long remain = timeout;
- int active = count.getActive();
- if (active >= max) {
- synchronized (count) {
- while ((active = count.getActive()) >= max) {
- try {
- count.wait(remain);
- } catch (InterruptedException e) {
- }
- long elapsed = System.currentTimeMillis() - start;
- remain = timeout - elapsed;
- if (remain <= 0) {
- throw new RpcException("Waiting concurrent invoke
timeout in client-side for service: "
- + invoker.getInterface().getName() + ",
method: "
- + invocation.getMethodName() + ", elapsed:
" + elapsed
- + ", timeout: " + timeout + ". concurrent
invokes: " + active
- + ". max concurrent invoke limit: " + max);
- }
- }
+ activesLimit = count.getActivesSemaphore(max);
+ try {
+ if(!(acquireResult =
activesLimit.tryAcquire(timeout,TimeUnit.MILLISECONDS))) {
+ long elapsed = System.currentTimeMillis() - start;
+ int active=count.getActive();
+ throw new RpcException("Waiting concurrent invoke timeout
in client-side for service: "
+ + invoker.getInterface().getName() + ", method: "
+ + invocation.getMethodName() + ", elapsed: " +
elapsed
+ + ", timeout: " + timeout + ". concurrent invokes:
" + active
+ + ". max concurrent invoke limit: " + max);
+
}
+ } catch (InterruptedException e) {
+ throw new RpcException("Waiting concurrent invoke fail in
client-side for service: "
+ + invoker.getInterface().getName() + ", method: "
+ + invocation.getMethodName());
}
}
try {
@@ -75,10 +75,8 @@ public class ActiveLimitFilter implements Filter {
throw t;
}
} finally {
- if (max > 0) {
- synchronized (count) {
- count.notify();
- }
+ if(acquireResult){
+ activesLimit.release();
}
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java
index b49e10f..607de85 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java
@@ -28,7 +28,9 @@ import org.apache.dubbo.rpc.support.MyInvoker;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
/**
@@ -36,7 +38,6 @@ import static org.junit.Assert.assertNotSame;
*/
public class ActiveLimitFilterTest {
- private static volatile int count = 0;
Filter activeLimitFilter = new ActiveLimitFilter();
@Test
@@ -57,6 +58,7 @@ public class ActiveLimitFilterTest {
@Test
public void testInvokeGreaterActives() {
+ AtomicInteger count = new AtomicInteger(0);
URL url =
URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&actives=1&timeout=1");
final Invoker<ActiveLimitFilterTest> invoker = new
BlockMyInvoker<ActiveLimitFilterTest>(url, 100);
final Invocation invocation = new MockInvocation();
@@ -74,7 +76,7 @@ public class ActiveLimitFilterTest {
try {
activeLimitFilter.invoke(invoker, invocation);
} catch (RpcException expected) {
- count++;
+ count.incrementAndGet();
}
}
}
@@ -88,6 +90,92 @@ public class ActiveLimitFilterTest {
} catch (InterruptedException e) {
e.printStackTrace();
}
- assertNotSame(0, count);
+ assertNotSame(0, count.intValue());
+ }
+
+ @Test
+ public void testInvokeTimeOut() {
+ int totalThread = 100;
+ int maxActives = 10;
+ long timeout = 1;
+ long blockTime = 100;
+ AtomicInteger count = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latchBlocking = new CountDownLatch(totalThread);
+ URL url =
URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&actives="+maxActives+"&timeout="+timeout);
+ final Invoker<ActiveLimitFilterTest> invoker = new
BlockMyInvoker<ActiveLimitFilterTest>(url, blockTime);
+ final Invocation invocation = new MockInvocation();
+ for (int i = 0; i < totalThread; i++) {
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ try{
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ try {
+ activeLimitFilter.invoke(invoker, invocation);
+ } catch (RpcException expected) {
+ count.incrementAndGet();
+ }
+ }finally {
+ latchBlocking.countDown();
+ }
+ }
+ });
+ thread.start();
+ }
+ latch.countDown();
+
+ try {
+ latchBlocking.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ assertEquals(90, count.intValue());
+ }
+
+ @Test
+ public void testInvokeNotTimeOut() {
+ int totalThread = 100;
+ int maxActives = 10;
+ long timeout = 1000;
+ long blockTime = 0;
+ AtomicInteger count = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latchBlocking = new CountDownLatch(totalThread);
+ URL url =
URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&actives="+maxActives+"&timeout="+timeout);
+ final Invoker<ActiveLimitFilterTest> invoker = new
BlockMyInvoker<ActiveLimitFilterTest>(url, blockTime);
+ final Invocation invocation = new MockInvocation();
+ for (int i = 0; i < totalThread; i++) {
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ try{
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ try {
+ activeLimitFilter.invoke(invoker, invocation);
+ } catch (RpcException expected) {
+ count.incrementAndGet();
+ }
+ }finally {
+ latchBlocking.countDown();
+ }
+ }
+ });
+ thread.start();
+ }
+ latch.countDown();
+
+ try {
+ latchBlocking.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ assertEquals(0, count.intValue());
}
}
\ No newline at end of file