This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 3134056ac0 optimize ThreadlessExecutor (#11965)
3134056ac0 is described below

commit 3134056ac0fa32a229eada68e46b458bf07b3b41
Author: icodening <[email protected]>
AuthorDate: Tue Apr 11 10:52:59 2023 +0800

    optimize ThreadlessExecutor (#11965)
    
    * 1. optimize ThreadlessExecutor
    2. tri response works on user threads
    
    * 1. optimize ThreadlessExecutor
    2. tri response works on user threads
    
    * 1. optimize ThreadlessExecutor
    2. tri response works on user threads
    
    * 1. optimize ThreadlessExecutor
    2. tri response works on user threads
    
    * 1.optimize ThreadlessExecutor
    2.tri response works on user threads
    
    * threadless only sync mode
    
    * threadless only sync mode
    
    ---------
    
    Co-authored-by: earthchen <[email protected]>
---
 .../common/threadpool/ThreadlessExecutor.java      | 127 +++++++--------------
 .../common/threadpool/ThreadlessExecutorTest.java  |  10 +-
 .../remoting/exchange/support/DefaultFuture.java   |  20 +---
 .../exchange/support/DefaultFutureTest.java        |   8 +-
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  |  16 ++-
 .../dubbo/rpc/protocol/tri/DeadlineFuture.java     |  25 +---
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      |  24 +++-
 .../dubbo/rpc/protocol/tri/TripleInvokerTest.java  |   3 +-
 8 files changed, 91 insertions(+), 142 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
index f1479e60ae..9e127af51e 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -21,11 +21,12 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
 
 /**
  * The most important difference between this Executor and other normal 
Executor is that this one doesn't manage
@@ -38,78 +39,42 @@ import java.util.concurrent.TimeUnit;
 public class ThreadlessExecutor extends AbstractExecutorService {
     private static final Logger logger = 
LoggerFactory.getLogger(ThreadlessExecutor.class.getName());
 
-    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+    private static final Object SHUTDOWN = new Object();
 
-    private CompletableFuture<?> waitingFuture;
+    private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
 
-    private boolean finished = false;
-
-    private volatile boolean waiting = true;
-
-    private final Object lock = new Object();
-
-    public CompletableFuture<?> getWaitingFuture() {
-        return waitingFuture;
-    }
-
-    public void setWaitingFuture(CompletableFuture<?> waitingFuture) {
-        this.waitingFuture = waitingFuture;
-    }
-
-    private boolean isFinished() {
-        return finished;
-    }
-
-    private void setFinished(boolean finished) {
-        this.finished = finished;
-    }
-
-    public boolean isWaiting() {
-        return waiting;
-    }
-
-    private void setWaiting(boolean waiting) {
-        this.waiting = waiting;
-    }
+    /**
+     * Wait thread. It must be visible to other threads and does not need to 
be thread-safe
+     */
+    private volatile Object waiter;
 
     /**
      * Waits until there is a task, executes the task and all queued tasks (if 
there're any). The task is either a normal
      * response or a timeout response.
      */
     public void waitAndDrain() throws InterruptedException {
-        /**
-         * Usually, {@link #waitAndDrain()} will only get called once. It 
blocks for the response for the first time,
-         * once the response (the task) reached and being executed 
waitAndDrain will return, the whole request process
-         * then finishes. Subsequent calls on {@link #waitAndDrain()} (if 
there're any) should return immediately.
-         *
-         * There's no need to worry that {@link #finished} is not thread-safe. 
Checking and updating of
-         * 'finished' only appear in waitAndDrain, since waitAndDrain is 
binding to one RPC call (one thread), the call
-         * of it is totally sequential.
-         */
-        if (isFinished()) {
-            return;
-        }
-
-        Runnable runnable;
-        try {
-            runnable = queue.take();
-        } catch (InterruptedException e) {
-            setWaiting(false);
-            throw e;
+        throwIfInterrupted();
+        Runnable runnable = queue.poll();
+        if (runnable == null) {
+            waiter = Thread.currentThread();
+            try {
+                while ((runnable = queue.poll()) == null) {
+                    LockSupport.park(this);
+                    throwIfInterrupted();
+                }
+            } finally {
+                waiter = null;
+            }
         }
-
-        synchronized (lock) {
-            setWaiting(false);
+        do {
             runnable.run();
-        }
+        } while ((runnable = queue.poll()) != null);
+    }
 
-        runnable = queue.poll();
-        while (runnable != null) {
-            runnable.run();
-            runnable = queue.poll();
+    private static void throwIfInterrupted() throws InterruptedException {
+        if (Thread.interrupted()) {
+            throw new InterruptedException();
         }
-        // mark the status of ThreadlessExecutor as finished.
-        setFinished(true);
     }
 
     /**
@@ -120,26 +85,15 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
      */
     @Override
     public void execute(Runnable runnable) {
-        runnable = new RunnableWrapper(runnable);
-        synchronized (lock) {
-            if (!isWaiting()) {
-                runnable.run();
-                return;
-            }
-            queue.add(runnable);
+        RunnableWrapper run = new RunnableWrapper(runnable);
+        queue.add(run);
+        if (waiter != SHUTDOWN) {
+            LockSupport.unpark((Thread) waiter);
+        } else if (queue.remove(run)) {
+            throw new RejectedExecutionException();
         }
     }
 
-    /**
-     * tells the thread blocking on {@link #waitAndDrain()} to return, despite 
of the current status, to avoid endless waiting.
-     */
-    public void notifyReturn(Throwable t) {
-        // an empty runnable task.
-        execute(() -> {
-            waitingFuture.completeExceptionally(t);
-        });
-    }
-
     /**
      * The following methods are still not supported
      */
@@ -151,23 +105,26 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
 
     @Override
     public List<Runnable> shutdownNow() {
-        notifyReturn(new IllegalStateException("Consumer is shutting down and 
this call is going to be stopped without " +
-                "receiving any result, usually this is called by a slow 
provider instance or bad service implementation."));
+        waiter = SHUTDOWN;
+        Runnable runnable;
+        while ((runnable = queue.poll()) != null) {
+            runnable.run();
+        }
         return Collections.emptyList();
     }
 
     @Override
     public boolean isShutdown() {
-        return false;
+        return waiter == SHUTDOWN;
     }
 
     @Override
     public boolean isTerminated() {
-        return false;
+        return isShutdown();
     }
 
     @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    public boolean awaitTermination(long timeout, TimeUnit unit) {
         return false;
     }
 
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/ThreadlessExecutorTest.java
 
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/ThreadlessExecutorTest.java
index fd806b9416..5643c3c422 100644
--- 
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/ThreadlessExecutorTest.java
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/ThreadlessExecutorTest.java
@@ -18,16 +18,12 @@ package org.apache.dubbo.common.threadpool;
 
 import org.apache.dubbo.common.URL;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.util.concurrent.CompletableFuture;
-
 class ThreadlessExecutorTest {
-    private static ThreadlessExecutor executor;
+    private static final ThreadlessExecutor executor;
 
     static {
-        URL url = URL.valueOf("dubbo://127.0.0.1:12345");
         executor = new ThreadlessExecutor();
     }
 
@@ -37,10 +33,6 @@ class ThreadlessExecutorTest {
             executor.execute(()->{throw new RuntimeException("test");});
         }
 
-        CompletableFuture<Object> stubFuture = new CompletableFuture<>();
-        executor.setWaitingFuture(stubFuture);
-        Assertions.assertEquals(executor.getWaitingFuture(),stubFuture);
-
         executor.waitAndDrain();
 
         executor.execute(()->{});
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 9ca69e9df1..c35739a88e 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.remoting.exchange.support;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.resource.GlobalResourceInitializer;
-import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.common.timer.HashedWheelTimer;
 import org.apache.dubbo.common.timer.Timeout;
 import org.apache.dubbo.common.timer.Timer;
@@ -124,10 +123,6 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
     public static DefaultFuture newFuture(Channel channel, Request request, 
int timeout, ExecutorService executor) {
         final DefaultFuture future = new DefaultFuture(channel, request, 
timeout);
         future.setExecutor(executor);
-        // ThreadlessExecutor needs to hold the waiting future in case of 
circuit return.
-        if (executor instanceof ThreadlessExecutor) {
-            ((ThreadlessExecutor) executor).setWaitingFuture(future);
-        }
         // timeout check
         timeoutCheck(future);
         return future;
@@ -223,16 +218,6 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
         } else {
             this.completeExceptionally(new RemotingException(channel, 
res.getErrorMessage()));
         }
-
-        // the result is returning, but the caller thread may still wait
-        // to avoid endless waiting for whatever reason, notify caller thread 
to return.
-        if (executor != null && executor instanceof ThreadlessExecutor) {
-            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
-            if (threadlessExecutor.isWaiting()) {
-                threadlessExecutor.notifyReturn(new IllegalStateException("The 
result has returned, but the biz thread is still waiting" +
-                    " which is not an expected state, interrupt the thread 
manually by returning an exception."));
-            }
-        }
     }
 
     private long getId() {
@@ -288,8 +273,9 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
                 return;
             }
 
-            if (future.getExecutor() != null) {
-                future.getExecutor().execute(() -> notifyTimeout(future));
+            ExecutorService executor = future.getExecutor();
+            if (executor != null && !executor.isShutdown()) {
+                executor.execute(() -> notifyTimeout(future));
             } else {
                 notifyTimeout(future);
             }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 987ec388f2..be487e2de0 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -134,8 +134,6 @@ class DefaultFutureTest {
         Channel channel = new MockedChannel();
         int channelId = 10;
         Request request = new Request(channelId);
-        ExecutorService sharedExecutor = 
ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
-                
.getDefaultExtension().createExecutorIfAbsent(URL.valueOf("dubbo://127.0.0.1:23456"));
         ThreadlessExecutor executor = new ThreadlessExecutor();
         DefaultFuture f = DefaultFuture.newFuture(channel, request, 1000, 
executor);
         //mark the future is sent
@@ -143,11 +141,15 @@ class DefaultFutureTest {
         // get operate will throw a interrupted exception, because the thread 
is interrupted.
         try {
             new InterruptThread(Thread.currentThread()).start();
-            executor.waitAndDrain();
+            while (!f. isDone()){
+                executor.waitAndDrain();
+            }
             f.get();
         } catch (Exception e) {
             Assertions.assertTrue(e instanceof InterruptedException, "catch 
exception is not interrupted exception!");
             System.out.println(e.getMessage());
+        } finally {
+            executor.shutdown();
         }
         //waiting timeout check task finished
         Thread.sleep(1500);
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index d0e59d8554..a3759cfc84 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -182,7 +182,13 @@ public class AsyncRpcResult implements Result {
     public Result get() throws InterruptedException, ExecutionException {
         if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
-            threadlessExecutor.waitAndDrain();
+            try {
+                while (!responseFuture.isDone()) {
+                    threadlessExecutor.waitAndDrain();
+                }
+            } finally {
+                threadlessExecutor.shutdown();
+            }
         }
         return responseFuture.get();
     }
@@ -191,7 +197,13 @@ public class AsyncRpcResult implements Result {
     public Result get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
         if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
-            threadlessExecutor.waitAndDrain();
+            try {
+                while (!responseFuture.isDone()) {
+                    threadlessExecutor.waitAndDrain();
+                }
+            } finally {
+                threadlessExecutor.shutdown();
+            }
         }
         return responseFuture.get(timeout, unit);
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
index f2985759fd..57e0613192 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
@@ -20,7 +20,6 @@ package org.apache.dubbo.rpc.protocol.tri;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.resource.GlobalResourceInitializer;
-import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.common.timer.HashedWheelTimer;
 import org.apache.dubbo.common.timer.Timeout;
 import org.apache.dubbo.common.timer.Timer;
@@ -34,7 +33,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 public class DeadlineFuture extends CompletableFuture<AppResponse> {
@@ -47,7 +46,7 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
     private final long start = System.currentTimeMillis();
     private final List<Runnable> timeoutListeners = new ArrayList<>();
     private final Timeout timeoutTask;
-    private ExecutorService executor;
+    private Executor executor;
 
     private DeadlineFuture(String serviceName, String methodName, String 
address, int timeout) {
         this.serviceName = serviceName;
@@ -70,13 +69,9 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
      * @return a new DeadlineFuture
      */
     public static DeadlineFuture newFuture(String serviceName, String 
methodName, String address,
-        int timeout, ExecutorService executor) {
+        int timeout, Executor executor) {
         final DeadlineFuture future = new DeadlineFuture(serviceName, 
methodName, address, timeout);
         future.setExecutor(executor);
-        // ThreadlessExecutor needs to hold the waiting future in case of 
circuit return.
-        if (executor instanceof ThreadlessExecutor) {
-            ((ThreadlessExecutor) executor).setWaitingFuture(future);
-        }
         return future;
     }
 
@@ -104,11 +99,11 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
         return timeoutListeners;
     }
 
-    public ExecutorService getExecutor() {
+    public Executor getExecutor() {
         return executor;
     }
 
-    public void setExecutor(ExecutorService executor) {
+    public void setExecutor(Executor executor) {
         this.executor = executor;
     }
 
@@ -135,16 +130,6 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
         this.complete(appResponse);
 
 
-        // the result is returning, but the caller thread may still waiting
-        // to avoid endless waiting for whatever reason, notify caller thread 
to return.
-        if (executor != null && executor instanceof ThreadlessExecutor) {
-            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
-            if (threadlessExecutor.isWaiting()) {
-                threadlessExecutor.notifyReturn(new IllegalStateException(
-                    "The result has returned, but the biz thread is still 
waiting"
-                        + " which is not an expected state, interrupt the 
thread manually by returning an exception."));
-            }
-        }
     }
 
     private String getTimeoutMessage() {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 6c11a1061a..94713b3498 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -24,16 +24,19 @@ import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.CancellationContext;
 import org.apache.dubbo.rpc.FutureContext;
 import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.InvokeMode;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ConsumerModel;
@@ -59,6 +62,7 @@ import java.util.Arrays;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -69,6 +73,7 @@ import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAI
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
 import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
 import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
+import static org.apache.dubbo.rpc.model.MethodDescriptor.RpcType.UNARY;
 
 /**
  * TripleInvoker
@@ -125,14 +130,14 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         final MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(
             invocation.getMethodName(),
             invocation.getParameterTypes());
-        ClientCall call = new TripleClientCall(connectionClient, 
streamExecutor,
+        Executor callbackExecutor = isSync(methodDescriptor, invocation) ? new 
ThreadlessExecutor() : streamExecutor;
+        ClientCall call = new TripleClientCall(connectionClient, 
callbackExecutor,
             getUrl().getOrDefaultFrameworkModel(), writeQueue);
-
         AsyncRpcResult result;
         try {
             switch (methodDescriptor.getRpcType()) {
                 case UNARY:
-                    result = invokeUnary(methodDescriptor, invocation, call);
+                    result = invokeUnary(methodDescriptor, invocation, call, 
callbackExecutor);
                     break;
                 case SERVER_STREAM:
                     result = invokeServerStream(methodDescriptor, invocation, 
call);
@@ -160,6 +165,16 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         }
     }
 
+    private static boolean isSync(MethodDescriptor methodDescriptor, 
Invocation invocation){
+        if (!(invocation instanceof RpcInvocation)) {
+            return false;
+        }
+        RpcInvocation rpcInvocation = (RpcInvocation) invocation;
+        MethodDescriptor.RpcType rpcType = methodDescriptor.getRpcType();
+        return UNARY.equals(rpcType)
+            && InvokeMode.SYNC.equals(rpcInvocation.getInvokeMode());
+    }
+
     AsyncRpcResult invokeServerStream(MethodDescriptor methodDescriptor, 
Invocation invocation,
                                       ClientCall call) {
         RequestMetadata request = createRequest(methodDescriptor, invocation, 
null);
@@ -199,8 +214,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
     }
 
     AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation 
invocation,
-                               ClientCall call) {
-        ExecutorService callbackExecutor = getCallbackExecutor(getUrl(), 
invocation);
+                               ClientCall call, Executor callbackExecutor) {
 
         int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, 
invocation.getMethodName(), 3000);
         if (timeout <= 0) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
index 976d1941c7..101941423a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
@@ -69,7 +70,7 @@ class TripleInvokerTest {
         MethodDescriptor echoMethod = new ReflectionMethodDescriptor(
                 IGreeter.class.getDeclaredMethod("echo", String.class));
         Assertions.assertTrue(invoker.isAvailable());
-        invoker.invokeUnary(echoMethod, invocation, call);
+        invoker.invokeUnary(echoMethod, invocation, call, new 
ThreadlessExecutor());
         invoker.destroy();
         Assertions.assertFalse(invoker.isAvailable());
     }

Reply via email to