This is an automated email from the ASF dual-hosted git repository.

icodening pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new c10d95b56a Fix RejectException (#12950)
c10d95b56a is described below

commit c10d95b56a48e88d9014238a56e7ee1ea18a33d2
Author: TomlongTK <[email protected]>
AuthorDate: Wed Oct 11 23:32:30 2023 +0800

    Fix RejectException (#12950)
    
    * fix RejectException null
    
    * fix reference
    
    * use isTerminated judge Executor
    
    * use isShutdown judge Executor
    
    ---------
    
    Co-authored-by: earthchen <[email protected]>
---
 .../common/threadpool/serial/SerializingExecutor.java     |  7 +++++--
 .../java/org/apache/dubbo/common/utils/ExecutorUtil.java  | 15 ++++++++++-----
 .../rpc/executor/AbstractIsolationExecutorSupport.java    |  1 +
 .../apache/dubbo/rpc/executor/DefaultExecutorSupport.java |  1 +
 .../main/java/org/apache/dubbo/rpc/AsyncRpcResult.java    |  4 ++--
 .../org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java | 14 ++++++--------
 6 files changed, 25 insertions(+), 17 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
index 5769c1296a..05fd83d1e4 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.threadlocal.InternalThreadLocalMap;
 
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_ERROR_RUN_THREAD_TASK;
+import static org.apache.dubbo.common.utils.ExecutorUtil.isShutdown;
 
 /**
  * Executor ensuring that all {@link Runnable} tasks submitted are executed in 
order
@@ -68,8 +69,10 @@ public final class SerializingExecutor implements Executor, 
Runnable {
         if (atomicBoolean.compareAndSet(false, true)) {
             boolean success = false;
             try {
-                executor.execute(this);
-                success = true;
+                if (!isShutdown(executor)) {
+                    executor.execute(this);
+                    success = true;
+                }
             } finally {
                 // It is possible that at this point that there are still 
tasks in
                 // the queue, it would be nice to keep trying but the error 
may not
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
index 6c0bc148a1..f6e1ce41ca 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
@@ -38,12 +38,17 @@ public class ExecutorUtil {
         new NamedThreadFactory("Close-ExecutorService-Timer", true));
 
     public static boolean isTerminated(Executor executor) {
-        if (executor instanceof ExecutorService) {
-            if (((ExecutorService) executor).isTerminated()) {
-                return true;
-            }
+        if (!(executor instanceof ExecutorService)) {
+            return false;
+        }
+        return ((ExecutorService) executor).isTerminated();
+    }
+
+    public static boolean isShutdown(Executor executor) {
+        if (!(executor instanceof ExecutorService)) {
+            return false;
         }
-        return false;
+        return ((ExecutorService) executor).isShutdown();
     }
 
     /**
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
index 2a228d3cc5..1c78036a91 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
@@ -36,6 +36,7 @@ public abstract class AbstractIsolationExecutorSupport 
implements ExecutorSuppor
         this.frameworkServiceRepository = 
url.getOrDefaultFrameworkModel().getServiceRepository();
     }
 
+    @Override
     public Executor getExecutor(Object data) {
 
         ProviderModel providerModel = getProviderModel(data);
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
index 3cd671d201..08dd4ebee7 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
@@ -30,6 +30,7 @@ public class DefaultExecutorSupport implements 
ExecutorSupport {
         this.executorRepository = 
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
     }
 
+    @Override
     public Executor getExecutor(Object data) {
         return executorRepository.getExecutor(url);
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 7244fa38fb..39e5cceb74 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -180,7 +180,7 @@ public class AsyncRpcResult implements Result {
      */
     @Override
     public Result get() throws InterruptedException, ExecutionException {
-        if (executor != null && executor instanceof ThreadlessExecutor) {
+        if (executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
             try {
                 while (!responseFuture.isDone() && 
!threadlessExecutor.isShutdown()) {
@@ -196,7 +196,7 @@ public class AsyncRpcResult implements Result {
     @Override
     public Result get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
         long deadline = System.nanoTime() + unit.toNanos(timeout);
-        if (executor != null && executor instanceof ThreadlessExecutor) {
+        if (executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
             try {
                 while (!responseFuture.isDone() && 
!threadlessExecutor.isShutdown()) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
index 32037a59cc..08160dcd74 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
@@ -47,6 +47,9 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
     private final List<Runnable> timeoutListeners = new ArrayList<>();
     private final Timeout timeoutTask;
     private ExecutorService executor;
+    private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new 
GlobalResourceInitializer<>(
+        () -> new HashedWheelTimer(new 
NamedThreadFactory("dubbo-future-timeout", true), 30,
+            TimeUnit.MILLISECONDS), DeadlineFuture::destroy);
 
     private DeadlineFuture(String serviceName, String methodName, String 
address, int timeout) {
         this.serviceName = serviceName;
@@ -76,20 +79,15 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
     }
 
     public void received(TriRpcStatus status, AppResponse appResponse) {
-        if (status.code != TriRpcStatus.Code.DEADLINE_EXCEEDED) {
-            // decrease Time
-            if (!timeoutTask.isCancelled()) {
-                timeoutTask.cancel();
-            }
+        if (status.code != TriRpcStatus.Code.DEADLINE_EXCEEDED && 
!timeoutTask.isCancelled()) {
+            timeoutTask.cancel();
         }
         if (getExecutor() != null) {
             getExecutor().execute(() -> doReceived(status, appResponse));
         } else {
             doReceived(status, appResponse);
         }
-    }    private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER 
= new GlobalResourceInitializer<>(
-        () -> new HashedWheelTimer(new 
NamedThreadFactory("dubbo-future-timeout", true), 30,
-            TimeUnit.MILLISECONDS), DeadlineFuture::destroy);
+    }
 
     public void addTimeoutListener(Runnable runnable) {
         timeoutListeners.add(runnable);

Reply via email to