This is an automated email from the ASF dual-hosted git repository.
icodening pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new c10d95b56a Fix RejectException (#12950)
c10d95b56a is described below
commit c10d95b56a48e88d9014238a56e7ee1ea18a33d2
Author: TomlongTK <[email protected]>
AuthorDate: Wed Oct 11 23:32:30 2023 +0800
Fix RejectException (#12950)
* fix RejectException null
* fix reference
* use isTerminated judge Executor
* use isShutdown judge Executor
---------
Co-authored-by: earthchen <[email protected]>
---
.../common/threadpool/serial/SerializingExecutor.java | 7 +++++--
.../java/org/apache/dubbo/common/utils/ExecutorUtil.java | 15 ++++++++++-----
.../rpc/executor/AbstractIsolationExecutorSupport.java | 1 +
.../apache/dubbo/rpc/executor/DefaultExecutorSupport.java | 1 +
.../main/java/org/apache/dubbo/rpc/AsyncRpcResult.java | 4 ++--
.../org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java | 14 ++++++--------
6 files changed, 25 insertions(+), 17 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
index 5769c1296a..05fd83d1e4 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/serial/SerializingExecutor.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadlocal.InternalThreadLocalMap;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_ERROR_RUN_THREAD_TASK;
+import static org.apache.dubbo.common.utils.ExecutorUtil.isShutdown;
/**
* Executor ensuring that all {@link Runnable} tasks submitted are executed in
order
@@ -68,8 +69,10 @@ public final class SerializingExecutor implements Executor,
Runnable {
if (atomicBoolean.compareAndSet(false, true)) {
boolean success = false;
try {
- executor.execute(this);
- success = true;
+ if (!isShutdown(executor)) {
+ executor.execute(this);
+ success = true;
+ }
} finally {
// It is possible that at this point that there are still
tasks in
// the queue, it would be nice to keep trying but the error
may not
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
index 6c0bc148a1..f6e1ce41ca 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
@@ -38,12 +38,17 @@ public class ExecutorUtil {
new NamedThreadFactory("Close-ExecutorService-Timer", true));
public static boolean isTerminated(Executor executor) {
- if (executor instanceof ExecutorService) {
- if (((ExecutorService) executor).isTerminated()) {
- return true;
- }
+ if (!(executor instanceof ExecutorService)) {
+ return false;
+ }
+ return ((ExecutorService) executor).isTerminated();
+ }
+
+ public static boolean isShutdown(Executor executor) {
+ if (!(executor instanceof ExecutorService)) {
+ return false;
}
- return false;
+ return ((ExecutorService) executor).isShutdown();
}
/**
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
index 2a228d3cc5..1c78036a91 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
@@ -36,6 +36,7 @@ public abstract class AbstractIsolationExecutorSupport
implements ExecutorSuppor
this.frameworkServiceRepository =
url.getOrDefaultFrameworkModel().getServiceRepository();
}
+ @Override
public Executor getExecutor(Object data) {
ProviderModel providerModel = getProviderModel(data);
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
index 3cd671d201..08dd4ebee7 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
@@ -30,6 +30,7 @@ public class DefaultExecutorSupport implements
ExecutorSupport {
this.executorRepository =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
}
+ @Override
public Executor getExecutor(Object data) {
return executorRepository.getExecutor(url);
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 7244fa38fb..39e5cceb74 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -180,7 +180,7 @@ public class AsyncRpcResult implements Result {
*/
@Override
public Result get() throws InterruptedException, ExecutionException {
- if (executor != null && executor instanceof ThreadlessExecutor) {
+ if (executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
try {
while (!responseFuture.isDone() &&
!threadlessExecutor.isShutdown()) {
@@ -196,7 +196,7 @@ public class AsyncRpcResult implements Result {
@Override
public Result get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
- if (executor != null && executor instanceof ThreadlessExecutor) {
+ if (executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
try {
while (!responseFuture.isDone() &&
!threadlessExecutor.isShutdown()) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
index 32037a59cc..08160dcd74 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
@@ -47,6 +47,9 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
private final List<Runnable> timeoutListeners = new ArrayList<>();
private final Timeout timeoutTask;
private ExecutorService executor;
+ private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new
GlobalResourceInitializer<>(
+ () -> new HashedWheelTimer(new
NamedThreadFactory("dubbo-future-timeout", true), 30,
+ TimeUnit.MILLISECONDS), DeadlineFuture::destroy);
private DeadlineFuture(String serviceName, String methodName, String
address, int timeout) {
this.serviceName = serviceName;
@@ -76,20 +79,15 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
}
public void received(TriRpcStatus status, AppResponse appResponse) {
- if (status.code != TriRpcStatus.Code.DEADLINE_EXCEEDED) {
- // decrease Time
- if (!timeoutTask.isCancelled()) {
- timeoutTask.cancel();
- }
+ if (status.code != TriRpcStatus.Code.DEADLINE_EXCEEDED &&
!timeoutTask.isCancelled()) {
+ timeoutTask.cancel();
}
if (getExecutor() != null) {
getExecutor().execute(() -> doReceived(status, appResponse));
} else {
doReceived(status, appResponse);
}
- } private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER
= new GlobalResourceInitializer<>(
- () -> new HashedWheelTimer(new
NamedThreadFactory("dubbo-future-timeout", true), 30,
- TimeUnit.MILLISECONDS), DeadlineFuture::destroy);
+ }
public void addTimeoutListener(Runnable runnable) {
timeoutListeners.add(runnable);