This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 1355b28734 fix throw RejectedExecutionException (#12674)
1355b28734 is described below
commit 1355b287342db6772f13446f061a5f04c80efd2d
Author: icodening <[email protected]>
AuthorDate: Thu Jul 6 12:40:45 2023 +0800
fix throw RejectedExecutionException (#12674)
* fix throw RejectedExecutionException
* fix throw RejectedExecutionException when ThreadlessExecutor has been
shutdown
---
.../org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java | 15 ++++++++-------
.../org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java | 5 ++---
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
index 57e0613192..32037a59cc 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
@@ -33,7 +33,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class DeadlineFuture extends CompletableFuture<AppResponse> {
@@ -46,7 +46,7 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
private final long start = System.currentTimeMillis();
private final List<Runnable> timeoutListeners = new ArrayList<>();
private final Timeout timeoutTask;
- private Executor executor;
+ private ExecutorService executor;
private DeadlineFuture(String serviceName, String methodName, String
address, int timeout) {
this.serviceName = serviceName;
@@ -69,7 +69,7 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
* @return a new DeadlineFuture
*/
public static DeadlineFuture newFuture(String serviceName, String
methodName, String address,
- int timeout, Executor executor) {
+ int timeout, ExecutorService executor) {
final DeadlineFuture future = new DeadlineFuture(serviceName,
methodName, address, timeout);
future.setExecutor(executor);
return future;
@@ -99,11 +99,11 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
return timeoutListeners;
}
- public Executor getExecutor() {
+ public ExecutorService getExecutor() {
return executor;
}
- public void setExecutor(Executor executor) {
+ public void setExecutor(ExecutorService executor) {
this.executor = executor;
}
@@ -149,8 +149,9 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
return;
}
- if (getExecutor() != null) {
- getExecutor().execute(() -> {
+ ExecutorService executor = getExecutor();
+ if (executor != null && !executor.isShutdown()) {
+ executor.execute(() -> {
notifyTimeout();
for (Runnable timeoutListener : getTimeoutListeners()) {
timeoutListener.run();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 0e18df346e..c5ba56b1db 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -63,7 +63,6 @@ import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
@@ -135,7 +134,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
final MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(
invocation.getMethodName(),
invocation.getParameterTypes());
- Executor callbackExecutor = isSync(methodDescriptor, invocation) ? new
ThreadlessExecutor() : streamExecutor;
+ ExecutorService callbackExecutor = isSync(methodDescriptor,
invocation) ? new ThreadlessExecutor() : streamExecutor;
ClientCall call = new TripleClientCall(connectionClient,
callbackExecutor,
getUrl().getOrDefaultFrameworkModel(), writeQueue);
AsyncRpcResult result;
@@ -219,7 +218,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
}
AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation
invocation,
- ClientCall call, Executor callbackExecutor) {
+ ClientCall call, ExecutorService
callbackExecutor) {
int timeout = RpcUtils.calculateTimeout(getUrl(), invocation,
RpcUtils.getMethodName(invocation), 3000);
if (timeout <= 0) {