This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new e6e68f6b84 Fix channel close event cause hanging thread (#12503)
e6e68f6b84 is described below

commit e6e68f6b8486ecc81973ad1a8b90b874c85086fe
Author: Albumen Kevin <[email protected]>
AuthorDate: Tue Jun 13 10:27:38 2023 +0800

    Fix channel close event cause hanging thread (#12503)
    
    * Fix channel close event cause hanging thread
    
    * Fix check
    
    * fix npe
    
    * fix ut
    
    * fix check
    
    * fix uts
    
    * fix testing
---
 .../common/threadpool/ThreadlessExecutor.java      | 39 +++++++++++++---------
 .../remoting/exchange/support/DefaultFuture.java   | 12 ++++++-
 .../exchange/support/DefaultFutureTest.java        | 12 ++++++-
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  |  4 +--
 4 files changed, 47 insertions(+), 20 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
index 23ef3cf235..5ff664bc0b 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -26,6 +26,7 @@ import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 
 /**
@@ -46,7 +47,7 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
     /**
      * Wait thread. It must be visible to other threads and does not need to 
be thread-safe
      */
-    private volatile Object waiter;
+    private final AtomicReference<Object> waiter = new AtomicReference<>();
 
     /**
      * Waits until there is a task, executes the task and all queued tasks (if 
there're any). The task is either a normal
@@ -56,22 +57,25 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
         throwIfInterrupted();
         Runnable runnable = queue.poll();
         if (runnable == null) {
-            waiter = Thread.currentThread();
-            try {
-                while ((runnable = queue.poll()) == null) {
-                    long restTime = deadline - System.nanoTime();
-                    if (restTime <= 0) {
-                        return;
+            if (waiter.compareAndSet(null, Thread.currentThread())) {
+                try {
+                    while ((runnable = queue.poll()) == null && waiter.get() 
== Thread.currentThread()) {
+                        long restTime = deadline - System.nanoTime();
+                        if (restTime <= 0) {
+                            return;
+                        }
+                        LockSupport.parkNanos(this, restTime);
+                        throwIfInterrupted();
                     }
-                    LockSupport.parkNanos(this, restTime);
-                    throwIfInterrupted();
+                } finally {
+                    waiter.compareAndSet(Thread.currentThread(), null);
                 }
-            } finally {
-                waiter = null;
             }
         }
         do {
-            runnable.run();
+            if (runnable != null) {
+                runnable.run();
+            }
         } while ((runnable = queue.poll()) != null);
     }
 
@@ -91,8 +95,8 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
     public void execute(Runnable runnable) {
         RunnableWrapper run = new RunnableWrapper(runnable);
         queue.add(run);
-        if (waiter != SHUTDOWN) {
-            LockSupport.unpark((Thread) waiter);
+        if (waiter.get() != SHUTDOWN) {
+            LockSupport.unpark((Thread) waiter.get());
         } else if (queue.remove(run)) {
             throw new RejectedExecutionException();
         }
@@ -109,7 +113,10 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
 
     @Override
     public List<Runnable> shutdownNow() {
-        waiter = SHUTDOWN;
+        if (waiter.get() != SHUTDOWN) {
+            LockSupport.unpark((Thread) waiter.get());
+        }
+        waiter.set(SHUTDOWN);
         Runnable runnable;
         while ((runnable = queue.poll()) != null) {
             runnable.run();
@@ -119,7 +126,7 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
 
     @Override
     public boolean isShutdown() {
-        return waiter == SHUTDOWN;
+        return waiter.get() == SHUTDOWN;
     }
 
     @Override
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 3539723adb..f49ef15867 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.resource.GlobalResourceInitializer;
 import org.apache.dubbo.common.serialize.SerializationException;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.common.timer.HashedWheelTimer;
 import org.apache.dubbo.common.timer.Timeout;
 import org.apache.dubbo.common.timer.Timer;
@@ -197,6 +198,7 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
                     t.cancel();
                 }
                 future.doReceived(response);
+                shutdownExecutorIfNeeded(future);
             } else {
                 logger.warn(PROTOCOL_TIMEOUT_SERVER, "", "", "The timeout 
response finally returned at "
                     + (new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS").format(new Date()))
@@ -215,12 +217,20 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
         errorResult.setStatus(Response.CLIENT_ERROR);
         errorResult.setErrorMessage("request future has been canceled.");
         this.doReceived(errorResult);
-        FUTURES.remove(id);
+        DefaultFuture future = FUTURES.remove(id);
+        shutdownExecutorIfNeeded(future);
         CHANNELS.remove(id);
         timeoutCheckTask.cancel();
         return true;
     }
 
+    private static void shutdownExecutorIfNeeded(DefaultFuture future) {
+        ExecutorService executor = future.getExecutor();
+        if (executor instanceof ThreadlessExecutor && !executor.isShutdown()) {
+            executor.shutdownNow();
+        }
+    }
+
     public void cancel() {
         this.cancel(true);
     }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 1e28237370..8188720480 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -161,7 +161,7 @@ class DefaultFutureTest {
     }
 
     @Test
-    void testClose() {
+    void testClose1() {
         Channel channel = new MockedChannel();
         Request request = new Request(123);
         ExecutorService executor = 
ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
@@ -171,6 +171,16 @@ class DefaultFutureTest {
         Assertions.assertFalse(executor.isTerminated());
     }
 
+    @Test
+    void testClose2() {
+        Channel channel = new MockedChannel();
+        Request request = new Request(123);
+        ThreadlessExecutor threadlessExecutor = new ThreadlessExecutor();
+        DefaultFuture.newFuture(channel, request, 1000, threadlessExecutor);
+        DefaultFuture.closeChannel(channel, 0);
+        Assertions.assertTrue(threadlessExecutor.isTerminated());
+    }
+
     /**
      * mock a default future
      */
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 5e516c72a9..7244fa38fb 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -183,7 +183,7 @@ public class AsyncRpcResult implements Result {
         if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
             try {
-                while (!responseFuture.isDone()) {
+                while (!responseFuture.isDone() && 
!threadlessExecutor.isShutdown()) {
                     threadlessExecutor.waitAndDrain(Long.MAX_VALUE);
                 }
             } finally {
@@ -199,7 +199,7 @@ public class AsyncRpcResult implements Result {
         if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
             try {
-                while (!responseFuture.isDone()) {
+                while (!responseFuture.isDone() && 
!threadlessExecutor.isShutdown()) {
                     long restTime = deadline - System.nanoTime();
                     if (restTime > 0) {
                         threadlessExecutor.waitAndDrain(deadline);

Reply via email to