This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 5672670 eager thradpool refine (#9608)
5672670 is described below
commit 5672670d8876b3a85201a38577c49cb7376bd7ab
Author: 王贺涛 <[email protected]>
AuthorDate: Sun Jan 23 17:11:42 2022 +0800
eager thradpool refine (#9608)
* set super class of AvailableCluster to AbstractCluster
* delete useless imports
* eager threapool refine
* remove useless import
---
.../support/eager/EagerThreadPoolExecutor.java | 27 +---------------
.../common/threadpool/support/eager/TaskQueue.java | 2 +-
.../support/eager/EagerThreadPoolExecutorTest.java | 36 ++++++++++++++++++++++
.../threadpool/support/eager/TaskQueueTest.java | 6 ++--
4 files changed, 41 insertions(+), 30 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
index d35e8d8..a0d2a34 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
@@ -22,18 +22,12 @@ import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* EagerThreadPoolExecutor
*/
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
- /**
- * task count
- */
- private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
-
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@@ -43,25 +37,12 @@ public class EagerThreadPoolExecutor extends
ThreadPoolExecutor {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
}
- /**
- * @return current tasks which are executed
- */
- public int getSubmittedTaskCount() {
- return submittedTaskCount.get();
- }
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- submittedTaskCount.decrementAndGet();
- }
-
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
- // do not increment in method beforeExecute!
- submittedTaskCount.incrementAndGet();
+
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
@@ -69,17 +50,11 @@ public class EagerThreadPoolExecutor extends
ThreadPoolExecutor {
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
- submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is
full.", rx);
}
} catch (InterruptedException x) {
- submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
- } catch (Throwable t) {
- // decrease any way
- submittedTaskCount.decrementAndGet();
- throw t;
}
}
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
index a798491..30bb911 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
@@ -50,7 +50,7 @@ public class TaskQueue<R extends Runnable> extends
LinkedBlockingQueue<Runnable>
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with
task.
- if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
+ if (executor.getActiveCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
index 3836cd6..89e8e4a 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
public class EagerThreadPoolExecutorTest {
@@ -99,4 +100,39 @@ public class EagerThreadPoolExecutorTest {
Assertions.assertEquals("EagerThreadPoolExecutor",
executorService.getClass()
.getSimpleName(), "test spi fail!");
}
+
+ @Test
+ public void testEagerThreadPool_rejectExecution() throws Exception {
+ String name = "eager-tf";
+ int cores = 1;
+ int threads = 3;
+ int queues = 2;
+ long alive = 1000;
+
+ // init queue and executor
+ TaskQueue<Runnable> taskQueue = new TaskQueue<>(queues);
+ final EagerThreadPoolExecutor executor = new
EagerThreadPoolExecutor(cores,
+ threads,
+ alive, TimeUnit.MILLISECONDS,
+ taskQueue,
+ new NamedThreadFactory(name, true),
+ new AbortPolicyWithReport(name, URL));
+ taskQueue.setExecutor(executor);
+
+ Runnable runnable = () -> {
+ System.out.println("thread number in current pool: " +
executor.getPoolSize() + ", task number is task queue: " +
executor.getQueue().size());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ };
+ for (int i = 0; i < 5; i++) {
+ Thread.sleep(50);
+ executor.execute(runnable);
+ }
+ Assertions.assertThrows(RejectedExecutionException.class, () ->
executor.execute(runnable));
+
+ Thread.sleep(10000);
+ }
}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueueTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueueTest.java
index dc4a8bf..534fe6a 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueueTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueueTest.java
@@ -43,7 +43,7 @@ public class TaskQueueTest {
TaskQueue<Runnable> queue = new TaskQueue<Runnable>(1);
EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class);
Mockito.when(executor.getPoolSize()).thenReturn(2);
- Mockito.when(executor.getSubmittedTaskCount()).thenReturn(1);
+ Mockito.when(executor.getActiveCount()).thenReturn(1);
queue.setExecutor(executor);
assertThat(queue.offer(mock(Runnable.class)), is(true));
}
@@ -53,7 +53,7 @@ public class TaskQueueTest {
TaskQueue<Runnable> queue = new TaskQueue<Runnable>(1);
EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class);
Mockito.when(executor.getPoolSize()).thenReturn(2);
- Mockito.when(executor.getSubmittedTaskCount()).thenReturn(2);
+ Mockito.when(executor.getActiveCount()).thenReturn(2);
Mockito.when(executor.getMaximumPoolSize()).thenReturn(4);
queue.setExecutor(executor);
assertThat(queue.offer(mock(Runnable.class)), is(false));
@@ -64,7 +64,7 @@ public class TaskQueueTest {
TaskQueue<Runnable> queue = new TaskQueue<Runnable>(1);
EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class);
Mockito.when(executor.getPoolSize()).thenReturn(4);
- Mockito.when(executor.getSubmittedTaskCount()).thenReturn(4);
+ Mockito.when(executor.getActiveCount()).thenReturn(4);
Mockito.when(executor.getMaximumPoolSize()).thenReturn(4);
queue.setExecutor(executor);
assertThat(queue.offer(mock(Runnable.class)), is(true));