This is an automated email from the ASF dual-hosted git repository.

iluo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fad342  [Dubbo-2328]Fix the concurrency limit of 'ActiveLimitFilter' 
to calculate atomicity (#2445)
2fad342 is described below

commit 2fad342ce1bd6afe683b001d063c82fb89237b48
Author: 三笠 <[email protected]>
AuthorDate: Fri Dec 14 10:09:19 2018 +0800

    [Dubbo-2328]Fix the concurrency limit of 'ActiveLimitFilter' to calculate 
atomicity (#2445)
    
    * [Dubbo-2328]Fix the concurrency limit of 'ActiveLimitFilter' to calculate 
atomicity
    
    * format
    
    * test timeout
    
    * count fix
    
    * format
---
 .../main/java/org/apache/dubbo/rpc/RpcStatus.java  | 26 ++++++
 .../apache/dubbo/rpc/filter/ActiveLimitFilter.java | 44 +++++-----
 .../dubbo/rpc/filter/ActiveLimitFilterTest.java    | 94 +++++++++++++++++++++-
 3 files changed, 138 insertions(+), 26 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
index 8b20cc4..1e42873 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
@@ -53,6 +53,12 @@ public class RpcStatus {
     private volatile Semaphore executesLimit;
     private volatile int executesPermits;
 
+    /**
+     * Semaphore used to control concurrency limit set by `actives`
+     */
+    private volatile Semaphore activesLimit;
+    private volatile int activesPermits;
+
     private RpcStatus() {
     }
 
@@ -334,4 +340,24 @@ public class RpcStatus {
 
         return executesLimit;
     }
+
+
+    /**
+     * Get the semaphore for thread number. Semaphore's permits is decided by 
{@link Constants#ACTIVES_KEY}
+     *
+     * @param maxThreadNum value of {@link Constants#ACTIVES_KEY}
+     * @return thread number semaphore
+     */
+    public Semaphore getActivesSemaphore(int maxThreadNum) {
+        if (activesLimit == null || activesPermits != maxThreadNum) {
+            synchronized (this) {
+                if (activesLimit == null || activesPermits != maxThreadNum) {
+                    activesLimit = new Semaphore(maxThreadNum);
+                    activesPermits = maxThreadNum;
+                }
+            }
+        }
+
+        return activesLimit;
+    }
 }
\ No newline at end of file
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java
index 7f7aff8..2513b5e 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ActiveLimitFilter.java
@@ -25,6 +25,8 @@ import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcStatus;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 /**
  * LimitInvokerFilter
@@ -37,30 +39,28 @@ public class ActiveLimitFilter implements Filter {
         URL url = invoker.getUrl();
         String methodName = invocation.getMethodName();
         int max = invoker.getUrl().getMethodParameter(methodName, 
Constants.ACTIVES_KEY, 0);
+        Semaphore activesLimit = null;
+        boolean acquireResult = false;
         RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), 
invocation.getMethodName());
         if (max > 0) {
             long timeout = 
invoker.getUrl().getMethodParameter(invocation.getMethodName(), 
Constants.TIMEOUT_KEY, 0);
             long start = System.currentTimeMillis();
-            long remain = timeout;
-            int active = count.getActive();
-            if (active >= max) {
-                synchronized (count) {
-                    while ((active = count.getActive()) >= max) {
-                        try {
-                            count.wait(remain);
-                        } catch (InterruptedException e) {
-                        }
-                        long elapsed = System.currentTimeMillis() - start;
-                        remain = timeout - elapsed;
-                        if (remain <= 0) {
-                            throw new RpcException("Waiting concurrent invoke 
timeout in client-side for service:  "
-                                    + invoker.getInterface().getName() + ", 
method: "
-                                    + invocation.getMethodName() + ", elapsed: 
" + elapsed
-                                    + ", timeout: " + timeout + ". concurrent 
invokes: " + active
-                                    + ". max concurrent invoke limit: " + max);
-                        }
-                    }
+            activesLimit = count.getActivesSemaphore(max);
+            try {
+                if(!(acquireResult = 
activesLimit.tryAcquire(timeout,TimeUnit.MILLISECONDS))) {
+                    long elapsed = System.currentTimeMillis() - start;
+                    int active=count.getActive();
+                    throw new RpcException("Waiting concurrent invoke timeout 
in client-side for service:  "
+                            + invoker.getInterface().getName() + ", method: "
+                            + invocation.getMethodName() + ", elapsed: " + 
elapsed
+                            + ", timeout: " + timeout + ". concurrent invokes: 
" + active
+                            + ". max concurrent invoke limit: " + max);
+
                 }
+            } catch (InterruptedException e) {
+                throw new RpcException("Waiting concurrent invoke fail in 
client-side for service:  "
+                        + invoker.getInterface().getName() + ", method: "
+                        + invocation.getMethodName());
             }
         }
         try {
@@ -75,10 +75,8 @@ public class ActiveLimitFilter implements Filter {
                 throw t;
             }
         } finally {
-            if (max > 0) {
-                synchronized (count) {
-                    count.notify();
-                }
+            if(acquireResult){
+                activesLimit.release();
             }
         }
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java
 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java
index b49e10f..607de85 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ActiveLimitFilterTest.java
@@ -28,7 +28,9 @@ import org.apache.dubbo.rpc.support.MyInvoker;
 import org.junit.Test;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 
 /**
@@ -36,7 +38,6 @@ import static org.junit.Assert.assertNotSame;
  */
 public class ActiveLimitFilterTest {
 
-    private static volatile int count = 0;
     Filter activeLimitFilter = new ActiveLimitFilter();
 
     @Test
@@ -57,6 +58,7 @@ public class ActiveLimitFilterTest {
 
     @Test
     public void testInvokeGreaterActives() {
+        AtomicInteger count = new AtomicInteger(0);
         URL url = 
URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&actives=1&timeout=1");
         final Invoker<ActiveLimitFilterTest> invoker = new 
BlockMyInvoker<ActiveLimitFilterTest>(url, 100);
         final Invocation invocation = new MockInvocation();
@@ -74,7 +76,7 @@ public class ActiveLimitFilterTest {
                         try {
                             activeLimitFilter.invoke(invoker, invocation);
                         } catch (RpcException expected) {
-                            count++;
+                            count.incrementAndGet();
                         }
                     }
                 }
@@ -88,6 +90,92 @@ public class ActiveLimitFilterTest {
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
-        assertNotSame(0, count);
+        assertNotSame(0, count.intValue());
+    }
+
+    @Test
+    public void testInvokeTimeOut() {
+        int totalThread = 100;
+        int maxActives = 10;
+        long timeout = 1;
+        long blockTime = 100;
+        AtomicInteger count = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(1);
+        final CountDownLatch latchBlocking = new CountDownLatch(totalThread);
+        URL url = 
URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&actives="+maxActives+"&timeout="+timeout);
+        final Invoker<ActiveLimitFilterTest> invoker = new 
BlockMyInvoker<ActiveLimitFilterTest>(url, blockTime);
+        final Invocation invocation = new MockInvocation();
+        for (int i = 0; i < totalThread; i++) {
+            Thread thread = new Thread(new Runnable() {
+                public void run() {
+                    try{
+                        try {
+                            latch.await();
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                        try {
+                            activeLimitFilter.invoke(invoker, invocation);
+                        } catch (RpcException expected) {
+                            count.incrementAndGet();
+                        }
+                    }finally {
+                        latchBlocking.countDown();
+                    }
+                }
+            });
+            thread.start();
+        }
+        latch.countDown();
+
+        try {
+            latchBlocking.await();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        assertEquals(90, count.intValue());
+    }
+
+    @Test
+    public void testInvokeNotTimeOut() {
+        int totalThread = 100;
+        int maxActives = 10;
+        long timeout = 1000;
+        long blockTime = 0;
+        AtomicInteger count = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(1);
+        final CountDownLatch latchBlocking = new CountDownLatch(totalThread);
+        URL url = 
URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&actives="+maxActives+"&timeout="+timeout);
+        final Invoker<ActiveLimitFilterTest> invoker = new 
BlockMyInvoker<ActiveLimitFilterTest>(url, blockTime);
+        final Invocation invocation = new MockInvocation();
+        for (int i = 0; i < totalThread; i++) {
+            Thread thread = new Thread(new Runnable() {
+                public void run() {
+                    try{
+                        try {
+                            latch.await();
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                        try {
+                            activeLimitFilter.invoke(invoker, invocation);
+                        } catch (RpcException expected) {
+                            count.incrementAndGet();
+                        }
+                    }finally {
+                        latchBlocking.countDown();
+                    }
+                }
+            });
+            thread.start();
+        }
+        latch.countDown();
+
+        try {
+            latchBlocking.await();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        assertEquals(0, count.intValue());
     }
 }
\ No newline at end of file

Reply via email to