This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new e6e68f6b84 Fix channel close event cause hanging thread (#12503)
e6e68f6b84 is described below
commit e6e68f6b8486ecc81973ad1a8b90b874c85086fe
Author: Albumen Kevin <[email protected]>
AuthorDate: Tue Jun 13 10:27:38 2023 +0800
Fix channel close event cause hanging thread (#12503)
* Fix channel close event cause hanging thread
* Fix check
* fix npe
* fix ut
* fix check
* fix uts
* fix testing
---
.../common/threadpool/ThreadlessExecutor.java | 39 +++++++++++++---------
.../remoting/exchange/support/DefaultFuture.java | 12 ++++++-
.../exchange/support/DefaultFutureTest.java | 12 ++++++-
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 4 +--
4 files changed, 47 insertions(+), 20 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
index 23ef3cf235..5ff664bc0b 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -26,6 +26,7 @@ import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/**
@@ -46,7 +47,7 @@ public class ThreadlessExecutor extends
AbstractExecutorService {
/**
* Wait thread. It must be visible to other threads and does not need to
be thread-safe
*/
- private volatile Object waiter;
+ private final AtomicReference<Object> waiter = new AtomicReference<>();
/**
* Waits until there is a task, executes the task and all queued tasks (if
there're any). The task is either a normal
@@ -56,22 +57,25 @@ public class ThreadlessExecutor extends
AbstractExecutorService {
throwIfInterrupted();
Runnable runnable = queue.poll();
if (runnable == null) {
- waiter = Thread.currentThread();
- try {
- while ((runnable = queue.poll()) == null) {
- long restTime = deadline - System.nanoTime();
- if (restTime <= 0) {
- return;
+ if (waiter.compareAndSet(null, Thread.currentThread())) {
+ try {
+ while ((runnable = queue.poll()) == null && waiter.get()
== Thread.currentThread()) {
+ long restTime = deadline - System.nanoTime();
+ if (restTime <= 0) {
+ return;
+ }
+ LockSupport.parkNanos(this, restTime);
+ throwIfInterrupted();
}
- LockSupport.parkNanos(this, restTime);
- throwIfInterrupted();
+ } finally {
+ waiter.compareAndSet(Thread.currentThread(), null);
}
- } finally {
- waiter = null;
}
}
do {
- runnable.run();
+ if (runnable != null) {
+ runnable.run();
+ }
} while ((runnable = queue.poll()) != null);
}
@@ -91,8 +95,8 @@ public class ThreadlessExecutor extends
AbstractExecutorService {
public void execute(Runnable runnable) {
RunnableWrapper run = new RunnableWrapper(runnable);
queue.add(run);
- if (waiter != SHUTDOWN) {
- LockSupport.unpark((Thread) waiter);
+ if (waiter.get() != SHUTDOWN) {
+ LockSupport.unpark((Thread) waiter.get());
} else if (queue.remove(run)) {
throw new RejectedExecutionException();
}
@@ -109,7 +113,10 @@ public class ThreadlessExecutor extends
AbstractExecutorService {
@Override
public List<Runnable> shutdownNow() {
- waiter = SHUTDOWN;
+ if (waiter.get() != SHUTDOWN) {
+ LockSupport.unpark((Thread) waiter.get());
+ }
+ waiter.set(SHUTDOWN);
Runnable runnable;
while ((runnable = queue.poll()) != null) {
runnable.run();
@@ -119,7 +126,7 @@ public class ThreadlessExecutor extends
AbstractExecutorService {
@Override
public boolean isShutdown() {
- return waiter == SHUTDOWN;
+ return waiter.get() == SHUTDOWN;
}
@Override
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 3539723adb..f49ef15867 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.resource.GlobalResourceInitializer;
import org.apache.dubbo.common.serialize.SerializationException;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
@@ -197,6 +198,7 @@ public class DefaultFuture extends
CompletableFuture<Object> {
t.cancel();
}
future.doReceived(response);
+ shutdownExecutorIfNeeded(future);
} else {
logger.warn(PROTOCOL_TIMEOUT_SERVER, "", "", "The timeout
response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS").format(new Date()))
@@ -215,12 +217,20 @@ public class DefaultFuture extends
CompletableFuture<Object> {
errorResult.setStatus(Response.CLIENT_ERROR);
errorResult.setErrorMessage("request future has been canceled.");
this.doReceived(errorResult);
- FUTURES.remove(id);
+ DefaultFuture future = FUTURES.remove(id);
+ shutdownExecutorIfNeeded(future);
CHANNELS.remove(id);
timeoutCheckTask.cancel();
return true;
}
+ private static void shutdownExecutorIfNeeded(DefaultFuture future) {
+ ExecutorService executor = future.getExecutor();
+ if (executor instanceof ThreadlessExecutor && !executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+
public void cancel() {
this.cancel(true);
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 1e28237370..8188720480 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -161,7 +161,7 @@ class DefaultFutureTest {
}
@Test
- void testClose() {
+ void testClose1() {
Channel channel = new MockedChannel();
Request request = new Request(123);
ExecutorService executor =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
@@ -171,6 +171,16 @@ class DefaultFutureTest {
Assertions.assertFalse(executor.isTerminated());
}
+ @Test
+ void testClose2() {
+ Channel channel = new MockedChannel();
+ Request request = new Request(123);
+ ThreadlessExecutor threadlessExecutor = new ThreadlessExecutor();
+ DefaultFuture.newFuture(channel, request, 1000, threadlessExecutor);
+ DefaultFuture.closeChannel(channel, 0);
+ Assertions.assertTrue(threadlessExecutor.isTerminated());
+ }
+
/**
* mock a default future
*/
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 5e516c72a9..7244fa38fb 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -183,7 +183,7 @@ public class AsyncRpcResult implements Result {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
try {
- while (!responseFuture.isDone()) {
+ while (!responseFuture.isDone() &&
!threadlessExecutor.isShutdown()) {
threadlessExecutor.waitAndDrain(Long.MAX_VALUE);
}
} finally {
@@ -199,7 +199,7 @@ public class AsyncRpcResult implements Result {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
try {
- while (!responseFuture.isDone()) {
+ while (!responseFuture.isDone() &&
!threadlessExecutor.isShutdown()) {
long restTime = deadline - System.nanoTime();
if (restTime > 0) {
threadlessExecutor.waitAndDrain(deadline);