This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 872c8cbeff remove Future from FUTURES after timeout and add 
RejectedExecution test fix  (#15695)
872c8cbeff is described below

commit 872c8cbeffc1a80d5ef9a92da66062cb56883c69
Author: qxggg <[email protected]>
AuthorDate: Thu Sep 18 10:22:18 2025 +0800

    remove Future from FUTURES after timeout and add RejectedExecution test fix 
 (#15695)
---
 .../remoting/exchange/support/DefaultFuture.java   |  8 ++++-
 .../exchange/support/DefaultFutureTest.java        | 40 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 9f2e9c2d21..f18edb0881 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -324,7 +325,12 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
 
             ExecutorService executor = future.getExecutor();
             if (executor != null && !executor.isShutdown()) {
-                executor.execute(() -> notifyTimeout(future));
+                try {
+                    executor.execute(() -> notifyTimeout(future));
+                } catch (RejectedExecutionException e) {
+                    notifyTimeout(future);
+                    throw e;
+                }
             } else {
                 notifyTimeout(future);
             }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 988759fd06..608ff25d23 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -27,7 +27,10 @@ import org.apache.dubbo.remoting.handler.MockedChannel;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.jupiter.api.Assertions;
@@ -224,6 +227,43 @@ class DefaultFutureTest {
         Assertions.assertFalse(executor.isTerminated());
     }
 
+    @Test
+    void testTimeoutWithRejectedExecution() throws Exception {
+        // Create a ThreadPoolExecutor with a queue capacity of 1
+        ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
+                1, // corePoolSize
+                1, // maxPoolSize
+                60L,
+                TimeUnit.SECONDS,
+                new ArrayBlockingQueue<>(1), // queue capacity is 1
+                new ThreadPoolExecutor.AbortPolicy() // default rejection 
policy: throws exception
+                );
+        // Submit two tasks to occupy the thread and the queue
+        customExecutor.submit(() -> {
+            try {
+                Thread.sleep(500); // occupy the thread for a while
+            } catch (InterruptedException ignored) {
+            }
+        });
+        customExecutor.submit(() -> {
+            try {
+                Thread.sleep(500); // occupy the queue
+            } catch (InterruptedException ignored) {
+            }
+        });
+        // Create a Dubbo Mock Channel and a request
+        Channel channel = new MockedChannel();
+        Request request = new Request(999);
+        // Use Dubbo's newFuture and pass in the custom thread pool
+        DefaultFuture future = DefaultFuture.newFuture(channel, request, 100, 
customExecutor);
+        // Mark the request as sent
+        DefaultFuture.sent(channel, request);
+        // Wait for the timeout task to trigger
+        Thread.sleep(300);
+        Assertions.assertNull(DefaultFuture.getFuture(999), "Future should be 
removed from FUTURES after timeout");
+        customExecutor.shutdown();
+    }
+
     @Test
     void testClose2() {
         Channel channel = new MockedChannel();

Reply via email to