This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 872c8cbeff remove Future from FUTURES after timeout and add
RejectedExecution test fix (#15695)
872c8cbeff is described below
commit 872c8cbeffc1a80d5ef9a92da66062cb56883c69
Author: qxggg <[email protected]>
AuthorDate: Thu Sep 18 10:22:18 2025 +0800
remove Future from FUTURES after timeout and add RejectedExecution test fix
(#15695)
---
.../remoting/exchange/support/DefaultFuture.java | 8 ++++-
.../exchange/support/DefaultFutureTest.java | 40 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 9f2e9c2d21..f18edb0881 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -324,7 +325,12 @@ public class DefaultFuture extends
CompletableFuture<Object> {
ExecutorService executor = future.getExecutor();
if (executor != null && !executor.isShutdown()) {
- executor.execute(() -> notifyTimeout(future));
+ try {
+ executor.execute(() -> notifyTimeout(future));
+ } catch (RejectedExecutionException e) {
+ notifyTimeout(future);
+ throw e;
+ }
} else {
notifyTimeout(future);
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 988759fd06..608ff25d23 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -27,7 +27,10 @@ import org.apache.dubbo.remoting.handler.MockedChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Assertions;
@@ -224,6 +227,43 @@ class DefaultFutureTest {
Assertions.assertFalse(executor.isTerminated());
}
+ @Test
+ void testTimeoutWithRejectedExecution() throws Exception {
+ // Create a ThreadPoolExecutor with a queue capacity of 1
+ ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
+ 1, // corePoolSize
+ 1, // maxPoolSize
+ 60L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1), // queue capacity is 1
+ new ThreadPoolExecutor.AbortPolicy() // default rejection
policy: throws exception
+ );
+ // Submit two tasks to occupy the thread and the queue
+ customExecutor.submit(() -> {
+ try {
+ Thread.sleep(500); // occupy the thread for a while
+ } catch (InterruptedException ignored) {
+ }
+ });
+ customExecutor.submit(() -> {
+ try {
+ Thread.sleep(500); // occupy the queue
+ } catch (InterruptedException ignored) {
+ }
+ });
+ // Create a Dubbo Mock Channel and a request
+ Channel channel = new MockedChannel();
+ Request request = new Request(999);
+ // Use Dubbo's newFuture and pass in the custom thread pool
+ DefaultFuture future = DefaultFuture.newFuture(channel, request, 100,
customExecutor);
+ // Mark the request as sent
+ DefaultFuture.sent(channel, request);
+ // Wait for the timeout task to trigger
+ Thread.sleep(300);
+ Assertions.assertNull(DefaultFuture.getFuture(999), "Future should be
removed from FUTURES after timeout");
+ customExecutor.shutdown();
+ }
+
@Test
void testClose2() {
Channel channel = new MockedChannel();