This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 3134056ac0 optimize ThreadlessExecutor (#11965)
3134056ac0 is described below
commit 3134056ac0fa32a229eada68e46b458bf07b3b41
Author: icodening <[email protected]>
AuthorDate: Tue Apr 11 10:52:59 2023 +0800
optimize ThreadlessExecutor (#11965)
* 1. optimize ThreadlessExecutor
2. tri response works on user threads
* 1. optimize ThreadlessExecutor
2. tri response works on user threads
* 1. optimize ThreadlessExecutor
2. tri response works on user threads
* 1. optimize ThreadlessExecutor
2. tri response works on user threads
* 1.optimize ThreadlessExecutor
2.tri response works on user threads
* threadless only sync mode
* threadless only sync mode
---------
Co-authored-by: earthchen <[email protected]>
---
.../common/threadpool/ThreadlessExecutor.java | 127 +++++++--------------
.../common/threadpool/ThreadlessExecutorTest.java | 10 +-
.../remoting/exchange/support/DefaultFuture.java | 20 +---
.../exchange/support/DefaultFutureTest.java | 8 +-
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 16 ++-
.../dubbo/rpc/protocol/tri/DeadlineFuture.java | 25 +---
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 24 +++-
.../dubbo/rpc/protocol/tri/TripleInvokerTest.java | 3 +-
8 files changed, 91 insertions(+), 142 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
index f1479e60ae..9e127af51e 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -21,11 +21,12 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
/**
* The most important difference between this Executor and other normal
Executor is that this one doesn't manage
@@ -38,78 +39,42 @@ import java.util.concurrent.TimeUnit;
public class ThreadlessExecutor extends AbstractExecutorService {
private static final Logger logger =
LoggerFactory.getLogger(ThreadlessExecutor.class.getName());
- private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ private static final Object SHUTDOWN = new Object();
- private CompletableFuture<?> waitingFuture;
+ private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
- private boolean finished = false;
-
- private volatile boolean waiting = true;
-
- private final Object lock = new Object();
-
- public CompletableFuture<?> getWaitingFuture() {
- return waitingFuture;
- }
-
- public void setWaitingFuture(CompletableFuture<?> waitingFuture) {
- this.waitingFuture = waitingFuture;
- }
-
- private boolean isFinished() {
- return finished;
- }
-
- private void setFinished(boolean finished) {
- this.finished = finished;
- }
-
- public boolean isWaiting() {
- return waiting;
- }
-
- private void setWaiting(boolean waiting) {
- this.waiting = waiting;
- }
+ /**
+ * Wait thread. It must be visible to other threads and does not need to
be thread-safe
+ */
+ private volatile Object waiter;
/**
* Waits until there is a task, executes the task and all queued tasks (if
there're any). The task is either a normal
* response or a timeout response.
*/
public void waitAndDrain() throws InterruptedException {
- /**
- * Usually, {@link #waitAndDrain()} will only get called once. It
blocks for the response for the first time,
- * once the response (the task) reached and being executed
waitAndDrain will return, the whole request process
- * then finishes. Subsequent calls on {@link #waitAndDrain()} (if
there're any) should return immediately.
- *
- * There's no need to worry that {@link #finished} is not thread-safe.
Checking and updating of
- * 'finished' only appear in waitAndDrain, since waitAndDrain is
binding to one RPC call (one thread), the call
- * of it is totally sequential.
- */
- if (isFinished()) {
- return;
- }
-
- Runnable runnable;
- try {
- runnable = queue.take();
- } catch (InterruptedException e) {
- setWaiting(false);
- throw e;
+ throwIfInterrupted();
+ Runnable runnable = queue.poll();
+ if (runnable == null) {
+ waiter = Thread.currentThread();
+ try {
+ while ((runnable = queue.poll()) == null) {
+ LockSupport.park(this);
+ throwIfInterrupted();
+ }
+ } finally {
+ waiter = null;
+ }
}
-
- synchronized (lock) {
- setWaiting(false);
+ do {
runnable.run();
- }
+ } while ((runnable = queue.poll()) != null);
+ }
- runnable = queue.poll();
- while (runnable != null) {
- runnable.run();
- runnable = queue.poll();
+ private static void throwIfInterrupted() throws InterruptedException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
}
- // mark the status of ThreadlessExecutor as finished.
- setFinished(true);
}
/**
@@ -120,26 +85,15 @@ public class ThreadlessExecutor extends
AbstractExecutorService {
*/
@Override
public void execute(Runnable runnable) {
- runnable = new RunnableWrapper(runnable);
- synchronized (lock) {
- if (!isWaiting()) {
- runnable.run();
- return;
- }
- queue.add(runnable);
+ RunnableWrapper run = new RunnableWrapper(runnable);
+ queue.add(run);
+ if (waiter != SHUTDOWN) {
+ LockSupport.unpark((Thread) waiter);
+ } else if (queue.remove(run)) {
+ throw new RejectedExecutionException();
}
}
- /**
- * tells the thread blocking on {@link #waitAndDrain()} to return, despite
of the current status, to avoid endless waiting.
- */
- public void notifyReturn(Throwable t) {
- // an empty runnable task.
- execute(() -> {
- waitingFuture.completeExceptionally(t);
- });
- }
-
/**
* The following methods are still not supported
*/
@@ -151,23 +105,26 @@ public class ThreadlessExecutor extends
AbstractExecutorService {
@Override
public List<Runnable> shutdownNow() {
- notifyReturn(new IllegalStateException("Consumer is shutting down and
this call is going to be stopped without " +
- "receiving any result, usually this is called by a slow
provider instance or bad service implementation."));
+ waiter = SHUTDOWN;
+ Runnable runnable;
+ while ((runnable = queue.poll()) != null) {
+ runnable.run();
+ }
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
- return false;
+ return waiter == SHUTDOWN;
}
@Override
public boolean isTerminated() {
- return false;
+ return isShutdown();
}
@Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/ThreadlessExecutorTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/ThreadlessExecutorTest.java
index fd806b9416..5643c3c422 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/ThreadlessExecutorTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/ThreadlessExecutorTest.java
@@ -18,16 +18,12 @@ package org.apache.dubbo.common.threadpool;
import org.apache.dubbo.common.URL;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.util.concurrent.CompletableFuture;
-
class ThreadlessExecutorTest {
- private static ThreadlessExecutor executor;
+ private static final ThreadlessExecutor executor;
static {
- URL url = URL.valueOf("dubbo://127.0.0.1:12345");
executor = new ThreadlessExecutor();
}
@@ -37,10 +33,6 @@ class ThreadlessExecutorTest {
executor.execute(()->{throw new RuntimeException("test");});
}
- CompletableFuture<Object> stubFuture = new CompletableFuture<>();
- executor.setWaitingFuture(stubFuture);
- Assertions.assertEquals(executor.getWaitingFuture(),stubFuture);
-
executor.waitAndDrain();
executor.execute(()->{});
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 9ca69e9df1..c35739a88e 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.remoting.exchange.support;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.resource.GlobalResourceInitializer;
-import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
@@ -124,10 +123,6 @@ public class DefaultFuture extends
CompletableFuture<Object> {
public static DefaultFuture newFuture(Channel channel, Request request,
int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request,
timeout);
future.setExecutor(executor);
- // ThreadlessExecutor needs to hold the waiting future in case of
circuit return.
- if (executor instanceof ThreadlessExecutor) {
- ((ThreadlessExecutor) executor).setWaitingFuture(future);
- }
// timeout check
timeoutCheck(future);
return future;
@@ -223,16 +218,6 @@ public class DefaultFuture extends
CompletableFuture<Object> {
} else {
this.completeExceptionally(new RemotingException(channel,
res.getErrorMessage()));
}
-
- // the result is returning, but the caller thread may still wait
- // to avoid endless waiting for whatever reason, notify caller thread
to return.
- if (executor != null && executor instanceof ThreadlessExecutor) {
- ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
- if (threadlessExecutor.isWaiting()) {
- threadlessExecutor.notifyReturn(new IllegalStateException("The
result has returned, but the biz thread is still waiting" +
- " which is not an expected state, interrupt the thread
manually by returning an exception."));
- }
- }
}
private long getId() {
@@ -288,8 +273,9 @@ public class DefaultFuture extends
CompletableFuture<Object> {
return;
}
- if (future.getExecutor() != null) {
- future.getExecutor().execute(() -> notifyTimeout(future));
+ ExecutorService executor = future.getExecutor();
+ if (executor != null && !executor.isShutdown()) {
+ executor.execute(() -> notifyTimeout(future));
} else {
notifyTimeout(future);
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 987ec388f2..be487e2de0 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -134,8 +134,6 @@ class DefaultFutureTest {
Channel channel = new MockedChannel();
int channelId = 10;
Request request = new Request(channelId);
- ExecutorService sharedExecutor =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
-
.getDefaultExtension().createExecutorIfAbsent(URL.valueOf("dubbo://127.0.0.1:23456"));
ThreadlessExecutor executor = new ThreadlessExecutor();
DefaultFuture f = DefaultFuture.newFuture(channel, request, 1000,
executor);
//mark the future is sent
@@ -143,11 +141,15 @@ class DefaultFutureTest {
// get operate will throw a interrupted exception, because the thread
is interrupted.
try {
new InterruptThread(Thread.currentThread()).start();
- executor.waitAndDrain();
+ while (!f. isDone()){
+ executor.waitAndDrain();
+ }
f.get();
} catch (Exception e) {
Assertions.assertTrue(e instanceof InterruptedException, "catch
exception is not interrupted exception!");
System.out.println(e.getMessage());
+ } finally {
+ executor.shutdown();
}
//waiting timeout check task finished
Thread.sleep(1500);
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index d0e59d8554..a3759cfc84 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -182,7 +182,13 @@ public class AsyncRpcResult implements Result {
public Result get() throws InterruptedException, ExecutionException {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
- threadlessExecutor.waitAndDrain();
+ try {
+ while (!responseFuture.isDone()) {
+ threadlessExecutor.waitAndDrain();
+ }
+ } finally {
+ threadlessExecutor.shutdown();
+ }
}
return responseFuture.get();
}
@@ -191,7 +197,13 @@ public class AsyncRpcResult implements Result {
public Result get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
- threadlessExecutor.waitAndDrain();
+ try {
+ while (!responseFuture.isDone()) {
+ threadlessExecutor.waitAndDrain();
+ }
+ } finally {
+ threadlessExecutor.shutdown();
+ }
}
return responseFuture.get(timeout, unit);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
index f2985759fd..57e0613192 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
@@ -20,7 +20,6 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.resource.GlobalResourceInitializer;
-import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
@@ -34,7 +33,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
public class DeadlineFuture extends CompletableFuture<AppResponse> {
@@ -47,7 +46,7 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
private final long start = System.currentTimeMillis();
private final List<Runnable> timeoutListeners = new ArrayList<>();
private final Timeout timeoutTask;
- private ExecutorService executor;
+ private Executor executor;
private DeadlineFuture(String serviceName, String methodName, String
address, int timeout) {
this.serviceName = serviceName;
@@ -70,13 +69,9 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
* @return a new DeadlineFuture
*/
public static DeadlineFuture newFuture(String serviceName, String
methodName, String address,
- int timeout, ExecutorService executor) {
+ int timeout, Executor executor) {
final DeadlineFuture future = new DeadlineFuture(serviceName,
methodName, address, timeout);
future.setExecutor(executor);
- // ThreadlessExecutor needs to hold the waiting future in case of
circuit return.
- if (executor instanceof ThreadlessExecutor) {
- ((ThreadlessExecutor) executor).setWaitingFuture(future);
- }
return future;
}
@@ -104,11 +99,11 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
return timeoutListeners;
}
- public ExecutorService getExecutor() {
+ public Executor getExecutor() {
return executor;
}
- public void setExecutor(ExecutorService executor) {
+ public void setExecutor(Executor executor) {
this.executor = executor;
}
@@ -135,16 +130,6 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
this.complete(appResponse);
- // the result is returning, but the caller thread may still waiting
- // to avoid endless waiting for whatever reason, notify caller thread
to return.
- if (executor != null && executor instanceof ThreadlessExecutor) {
- ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
- if (threadlessExecutor.isWaiting()) {
- threadlessExecutor.notifyReturn(new IllegalStateException(
- "The result has returned, but the biz thread is still
waiting"
- + " which is not an expected state, interrupt the
thread manually by returning an exception."));
- }
- }
}
private String getTimeoutMessage() {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 6c11a1061a..94713b3498 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -24,16 +24,19 @@ import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.InvokeMode;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;
@@ -59,6 +62,7 @@ import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
@@ -69,6 +73,7 @@ import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAI
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
import static org.apache.dubbo.rpc.Constants.COMPRESSOR_KEY;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
+import static org.apache.dubbo.rpc.model.MethodDescriptor.RpcType.UNARY;
/**
* TripleInvoker
@@ -125,14 +130,14 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
final MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(
invocation.getMethodName(),
invocation.getParameterTypes());
- ClientCall call = new TripleClientCall(connectionClient,
streamExecutor,
+ Executor callbackExecutor = isSync(methodDescriptor, invocation) ? new
ThreadlessExecutor() : streamExecutor;
+ ClientCall call = new TripleClientCall(connectionClient,
callbackExecutor,
getUrl().getOrDefaultFrameworkModel(), writeQueue);
-
AsyncRpcResult result;
try {
switch (methodDescriptor.getRpcType()) {
case UNARY:
- result = invokeUnary(methodDescriptor, invocation, call);
+ result = invokeUnary(methodDescriptor, invocation, call,
callbackExecutor);
break;
case SERVER_STREAM:
result = invokeServerStream(methodDescriptor, invocation,
call);
@@ -160,6 +165,16 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
}
}
+ private static boolean isSync(MethodDescriptor methodDescriptor,
Invocation invocation){
+ if (!(invocation instanceof RpcInvocation)) {
+ return false;
+ }
+ RpcInvocation rpcInvocation = (RpcInvocation) invocation;
+ MethodDescriptor.RpcType rpcType = methodDescriptor.getRpcType();
+ return UNARY.equals(rpcType)
+ && InvokeMode.SYNC.equals(rpcInvocation.getInvokeMode());
+ }
+
AsyncRpcResult invokeServerStream(MethodDescriptor methodDescriptor,
Invocation invocation,
ClientCall call) {
RequestMetadata request = createRequest(methodDescriptor, invocation,
null);
@@ -199,8 +214,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
}
AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation
invocation,
- ClientCall call) {
- ExecutorService callbackExecutor = getCallbackExecutor(getUrl(),
invocation);
+ ClientCall call, Executor callbackExecutor) {
int timeout = RpcUtils.calculateTimeout(getUrl(), invocation,
invocation.getMethodName(), 3000);
if (timeout <= 0) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
index 976d1941c7..101941423a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
@@ -69,7 +70,7 @@ class TripleInvokerTest {
MethodDescriptor echoMethod = new ReflectionMethodDescriptor(
IGreeter.class.getDeclaredMethod("echo", String.class));
Assertions.assertTrue(invoker.isAvailable());
- invoker.invokeUnary(echoMethod, invocation, call);
+ invoker.invokeUnary(echoMethod, invocation, call, new
ThreadlessExecutor());
invoker.destroy();
Assertions.assertFalse(invoker.isAvailable());
}