This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 5672670  eager thradpool refine (#9608)
5672670 is described below

commit 5672670d8876b3a85201a38577c49cb7376bd7ab
Author: 王贺涛 <[email protected]>
AuthorDate: Sun Jan 23 17:11:42 2022 +0800

    eager thradpool refine (#9608)
    
    * set super class of AvailableCluster to AbstractCluster
    
    * delete useless imports
    
    * eager threapool refine
    
    * remove useless import
---
 .../support/eager/EagerThreadPoolExecutor.java     | 27 +---------------
 .../common/threadpool/support/eager/TaskQueue.java |  2 +-
 .../support/eager/EagerThreadPoolExecutorTest.java | 36 ++++++++++++++++++++++
 .../threadpool/support/eager/TaskQueueTest.java    |  6 ++--
 4 files changed, 41 insertions(+), 30 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
index d35e8d8..a0d2a34 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
@@ -22,18 +22,12 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * EagerThreadPoolExecutor
  */
 public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
 
-    /**
-     * task count
-     */
-    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
-
     public EagerThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
@@ -43,25 +37,12 @@ public class EagerThreadPoolExecutor extends 
ThreadPoolExecutor {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, handler);
     }
 
-    /**
-     * @return current tasks which are executed
-     */
-    public int getSubmittedTaskCount() {
-        return submittedTaskCount.get();
-    }
-
-    @Override
-    protected void afterExecute(Runnable r, Throwable t) {
-        submittedTaskCount.decrementAndGet();
-    }
-
     @Override
     public void execute(Runnable command) {
         if (command == null) {
             throw new NullPointerException();
         }
-        // do not increment in method beforeExecute!
-        submittedTaskCount.incrementAndGet();
+
         try {
             super.execute(command);
         } catch (RejectedExecutionException rx) {
@@ -69,17 +50,11 @@ public class EagerThreadPoolExecutor extends 
ThreadPoolExecutor {
             final TaskQueue queue = (TaskQueue) super.getQueue();
             try {
                 if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
-                    submittedTaskCount.decrementAndGet();
                     throw new RejectedExecutionException("Queue capacity is 
full.", rx);
                 }
             } catch (InterruptedException x) {
-                submittedTaskCount.decrementAndGet();
                 throw new RejectedExecutionException(x);
             }
-        } catch (Throwable t) {
-            // decrease any way
-            submittedTaskCount.decrementAndGet();
-            throw t;
         }
     }
 }
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
index a798491..30bb911 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
@@ -50,7 +50,7 @@ public class TaskQueue<R extends Runnable> extends 
LinkedBlockingQueue<Runnable>
 
         int currentPoolThreadSize = executor.getPoolSize();
         // have free worker. put task into queue to let the worker deal with 
task.
-        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
+        if (executor.getActiveCount() < currentPoolThreadSize) {
             return super.offer(runnable);
         }
 
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
 
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
index 3836cd6..89e8e4a 100644
--- 
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 public class EagerThreadPoolExecutorTest {
@@ -99,4 +100,39 @@ public class EagerThreadPoolExecutorTest {
         Assertions.assertEquals("EagerThreadPoolExecutor", 
executorService.getClass()
             .getSimpleName(), "test spi fail!");
     }
+
+    @Test
+    public void testEagerThreadPool_rejectExecution() throws Exception {
+        String name = "eager-tf";
+        int cores = 1;
+        int threads = 3;
+        int queues = 2;
+        long alive = 1000;
+
+        // init queue and executor
+        TaskQueue<Runnable> taskQueue = new TaskQueue<>(queues);
+        final EagerThreadPoolExecutor executor = new 
EagerThreadPoolExecutor(cores,
+            threads,
+            alive, TimeUnit.MILLISECONDS,
+            taskQueue,
+            new NamedThreadFactory(name, true),
+            new AbortPolicyWithReport(name, URL));
+        taskQueue.setExecutor(executor);
+
+        Runnable runnable = () -> {
+            System.out.println("thread number in current pool: " + 
executor.getPoolSize() + ", task number is task queue: " + 
executor.getQueue().size());
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        };
+        for (int i = 0; i < 5; i++) {
+            Thread.sleep(50);
+            executor.execute(runnable);
+        }
+        Assertions.assertThrows(RejectedExecutionException.class, () -> 
executor.execute(runnable));
+
+        Thread.sleep(10000);
+    }
 }
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueueTest.java
 
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueueTest.java
index dc4a8bf..534fe6a 100644
--- 
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueueTest.java
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueueTest.java
@@ -43,7 +43,7 @@ public class TaskQueueTest {
         TaskQueue<Runnable> queue = new TaskQueue<Runnable>(1);
         EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class);
         Mockito.when(executor.getPoolSize()).thenReturn(2);
-        Mockito.when(executor.getSubmittedTaskCount()).thenReturn(1);
+        Mockito.when(executor.getActiveCount()).thenReturn(1);
         queue.setExecutor(executor);
         assertThat(queue.offer(mock(Runnable.class)), is(true));
     }
@@ -53,7 +53,7 @@ public class TaskQueueTest {
         TaskQueue<Runnable> queue = new TaskQueue<Runnable>(1);
         EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class);
         Mockito.when(executor.getPoolSize()).thenReturn(2);
-        Mockito.when(executor.getSubmittedTaskCount()).thenReturn(2);
+        Mockito.when(executor.getActiveCount()).thenReturn(2);
         Mockito.when(executor.getMaximumPoolSize()).thenReturn(4);
         queue.setExecutor(executor);
         assertThat(queue.offer(mock(Runnable.class)), is(false));
@@ -64,7 +64,7 @@ public class TaskQueueTest {
         TaskQueue<Runnable> queue = new TaskQueue<Runnable>(1);
         EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class);
         Mockito.when(executor.getPoolSize()).thenReturn(4);
-        Mockito.when(executor.getSubmittedTaskCount()).thenReturn(4);
+        Mockito.when(executor.getActiveCount()).thenReturn(4);
         Mockito.when(executor.getMaximumPoolSize()).thenReturn(4);
         queue.setExecutor(executor);
         assertThat(queue.offer(mock(Runnable.class)), is(true));

Reply via email to