This is an automated email from the ASF dual-hosted git repository.
jefflv pushed a commit to branch 3.x-dev
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/3.x-dev by this push:
new 2484877 Reduce context switching cost by optimizing thread model on
consumer side. (#4106)
2484877 is described below
commit 2484877189c89e12c0ff67504838b702153aa016
Author: ken.lj <[email protected]>
AuthorDate: Tue May 21 14:26:46 2019 +0800
Reduce context switching cost by optimizing thread model on consumer side.
(#4106)
* introduce executor manager
* consumer thread model
* optimize consumer side thread model
---
.../common/threadpool/ThreadlessExecutor.java | 150 +++++++++++++++++++++
.../manager/DefaultExecutorRepository.java | 107 +++++++++++++++
.../threadpool/manager/ExecutorRepository.java | 63 +++++++++
.../dubbo/common/threadpool/manager/Ring.java | 50 +++++++
...bo.common.threadpool.manager.ExecutorRepository | 1 +
.../dubbo/demo/provider/DemoServiceImpl.java | 5 +
.../apache/dubbo/registry/dubbo/MockChannel.java | 11 ++
.../apache/dubbo/registry/dubbo/MockedClient.java | 15 ++-
.../dubbo/remoting/exchange/ExchangeChannel.java | 22 +++
.../remoting/exchange/support/DefaultFuture.java | 44 ++++--
.../support/header/HeaderExchangeChannel.java | 15 ++-
.../support/header/HeaderExchangeClient.java | 11 ++
.../dubbo/remoting/transport/AbstractClient.java | 9 +-
.../dubbo/remoting/transport/AbstractServer.java | 34 +----
.../dispatcher/WrappedChannelHandler.java | 85 +++++++-----
.../dispatcher/all/AllChannelHandler.java | 30 ++---
.../ConnectionOrderedChannelHandler.java | 21 +--
.../DirectChannelHandler.java} | 21 +--
.../dispatcher/direct/DirectDispatcher.java | 2 +-
.../execution/ExecutionChannelHandler.java | 20 +--
.../message/MessageOnlyChannelHandler.java | 4 +-
.../exchange/support/DefaultFutureTest.java | 4 +-
.../support/header/HeaderExchangeChannelTest.java | 12 +-
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 63 ++++++---
.../java/org/apache/dubbo/rpc}/FutureAdapter.java | 15 ++-
.../apache/dubbo/rpc/protocol/AbstractInvoker.java | 28 +++-
.../rpc/protocol/dubbo/DecodeableRpcResult.java | 1 +
.../dubbo/rpc/protocol/dubbo/DubboInvoker.java | 6 +-
.../protocol/dubbo/LazyConnectExchangeClient.java | 15 +++
.../dubbo/ReferenceCountExchangeClient.java | 11 ++
.../dubbo/rpc/protocol/thrift/ThriftInvoker.java | 3 -
31 files changed, 696 insertions(+), 182 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
new file mode 100644
index 0000000..322d8d9
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The most important difference between this Executor and other normal
Executor is that this one doesn't manage
+ * any thread.
+ *
+ * Tasks submitted to this executor through {@link #execute(Runnable)} will
not get scheduled to a specific thread, though normal executors always do the
schedule.
+ * Those tasks are stored in a blocking queue and will only be executed when a
thead calls {@link #waitAndDrain()}, the thead executing the task
+ * is exactly the same as the one calling waitAndDrain.
+ */
+public class ThreadlessExecutor extends AbstractExecutorService {
+ private static final Logger logger =
LoggerFactory.getLogger(ThreadlessExecutor.class.getName());
+
+ private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+
+ private ExecutorService sharedExecutor;
+
+ private volatile boolean waiting = true;
+
+ private final Object lock = new Object();
+
+ public ThreadlessExecutor(ExecutorService sharedExecutor) {
+ this.sharedExecutor = sharedExecutor;
+ }
+
+ public boolean isWaiting() {
+ return waiting;
+ }
+
+ /**
+ * Waits until there is a Runnable, then executes it and all queued
Runnables after it.
+ */
+ public void waitAndDrain() throws InterruptedException {
+ Runnable runnable = queue.take();
+
+ synchronized (lock) {
+ waiting = false;
+ runnable.run();
+ }
+
+ runnable = queue.poll();
+ while (runnable != null) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ logger.info(t);
+
+ }
+ runnable = queue.poll();
+ }
+ }
+
+ public long waitAndDrain(long timeout, TimeUnit unit) throws
InterruptedException, TimeoutException {
+ /*long startInMs = System.currentTimeMillis();
+ Runnable runnable = queue.poll(timeout, unit);
+ if (runnable == null) {
+ throw new TimeoutException();
+ }
+ runnable.run();
+ long elapsedInMs = System.currentTimeMillis() - startInMs;
+ long timeLeft = timeout - elapsedInMs;
+ if (timeLeft < 0) {
+ throw new TimeoutException();
+ }
+ return timeLeft;*/
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * If the calling thread is still waiting for a callback task, add the
task into the blocking queue to wait for schedule.
+ * Otherwise, submit to shared callback executor directly.
+ *
+ * @param runnable
+ */
+ @Override
+ public void execute(Runnable runnable) {
+ synchronized (lock) {
+ if (!waiting) {
+ sharedExecutor.execute(runnable);
+ } else {
+ queue.add(runnable);
+ }
+ }
+ }
+
+ /**
+ * tells the thread blocking on {@link #waitAndDrain()} to return, despite
of the current status, to avoid endless waiting.
+ */
+ public void notifyReturn() {
+ // an empty runnable task.
+ execute(() -> {
+ });
+ }
+
+ /**
+ * The following methods are still not supported
+ */
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return false;
+ }
+}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
new file mode 100644
index 0000000..327cdf1
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool.manager;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadPool;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Consider implementing {@link Licycle} to enable executors shutdown when the
process stops.
+ */
+public class DefaultExecutorRepository implements ExecutorRepository {
+ private static final Logger logger =
LoggerFactory.getLogger(DefaultExecutorRepository.class);
+
+ private int DEFAULT_SCHEDULER_SIZE =
Runtime.getRuntime().availableProcessors();
+
+ private final ExecutorService SHARED_EXECUTOR =
Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler",
true));
+
+ private Ring<ScheduledExecutorService> scheduledExecutors = new Ring<>();
+
+ private ScheduledExecutorService reconnectScheduledExecutor;
+
+ private ConcurrentMap<String, ConcurrentMap<String, ExecutorService>> data
= new ConcurrentHashMap<>();
+
+ public DefaultExecutorRepository() {
+ for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
+ ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("Dubbo-framework-scheduler"));
+ scheduledExecutors.addItem(scheduler);
+ }
+
+ reconnectScheduledExecutor =
Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("Dubbo-reconnect-scheduler"));
+ }
+
+ public ExecutorService createExecutorIfAbsent(URL url) {
+ String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
+ if
(Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY)))
{
+ componentKey = Constants.CONSUMER_SIDE;
+ }
+ Map<String, ExecutorService> executors =
data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
+ return executors.computeIfAbsent(Integer.toString(url.getPort()), k ->
(ExecutorService)
ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url));
+ }
+
+ @Override
+ public void updateThreadpool(URL url, ExecutorService executor) {
+ try {
+ if (url.hasParameter(Constants.THREADS_KEY)
+ && executor instanceof ThreadPoolExecutor &&
!executor.isShutdown()) {
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)
executor;
+ int threads = url.getParameter(Constants.THREADS_KEY, 0);
+ int max = threadPoolExecutor.getMaximumPoolSize();
+ int core = threadPoolExecutor.getCorePoolSize();
+ if (threads > 0 && (threads != max || threads != core)) {
+ if (threads < core) {
+ threadPoolExecutor.setCorePoolSize(threads);
+ if (core == max) {
+ threadPoolExecutor.setMaximumPoolSize(threads);
+ }
+ } else {
+ threadPoolExecutor.setMaximumPoolSize(threads);
+ if (core == max) {
+ threadPoolExecutor.setCorePoolSize(threads);
+ }
+ }
+ }
+ }
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public ScheduledExecutorService nextScheduledExecutor() {
+ return scheduledExecutors.pollItem();
+ }
+
+ @Override
+ public ExecutorService getSharedExecutor() {
+ return SHARED_EXECUTOR;
+ }
+
+}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
new file mode 100644
index 0000000..1d95a87
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool.manager;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.SPI;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ *
+ */
+@SPI("default")
+public interface ExecutorRepository {
+
+ /**
+ * Called by both Client and Server. TODO, consider separate these two
parts.
+ * When the Client or Server starts for the first time, generate a new
threadpool according to the parameters passed in usr.
+ *
+ * @param url
+ * @return
+ */
+ ExecutorService createExecutorIfAbsent(URL url);
+
+ /**
+ * Modify some of the threadpool's properties according to the url, for
example, coreSize, maxSize, ...
+ *
+ * @param url
+ * @param executor
+ */
+ void updateThreadpool(URL url, ExecutorService executor);
+
+ /**
+ * Returns a scheduler from the scheduler list, call this method whenever
you need a scheduler for a cron job.
+ * If your cron cannot burden the possible schedule delay caused by
sharing the same scheduler, please consider define a dedicate one.
+ *
+ * @return
+ */
+ ScheduledExecutorService nextScheduledExecutor();
+
+ /**
+ * Get the default shared threadpool.
+ *
+ * @return
+ */
+ ExecutorService getSharedExecutor();
+
+}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/Ring.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/Ring.java
new file mode 100644
index 0000000..1ddc0fd
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/Ring.java
@@ -0,0 +1,50 @@
+package org.apache.dubbo.common.threadpool.manager;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Ring<T> {
+
+ AtomicInteger count = new AtomicInteger();
+
+ private List<T> itemList = new CopyOnWriteArrayList<T>();
+
+ public void addItem(T t) {
+ if (t != null) {
+ itemList.add(t);
+ }
+ }
+
+ public T pollItem() {
+ if (itemList.isEmpty()) {
+ return null;
+ }
+ if (itemList.size() == 1) {
+ return itemList.get(0);
+ }
+
+ if (count.intValue() > Integer.MAX_VALUE - 10000) {
+ count.set(count.get() % itemList.size());
+ }
+
+ int index = Math.abs(count.getAndIncrement()) % itemList.size();
+ return itemList.get(index);
+ }
+
+ public T peekItem() {
+ if (itemList.isEmpty()) {
+ return null;
+ }
+ if (itemList.size() == 1) {
+ return itemList.get(0);
+ }
+ int index = Math.abs(count.get()) % itemList.size();
+ return itemList.get(index);
+ }
+
+ public List<T> listItems() {
+ return Collections.unmodifiableList(itemList);
+ }
+}
diff --git
a/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
new file mode 100644
index 0000000..44199b0
--- /dev/null
+++
b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
@@ -0,0 +1 @@
+default=org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository
\ No newline at end of file
diff --git
a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
index e95caa6..4856411 100644
---
a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
+++
b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
@@ -30,6 +30,11 @@ public class DemoServiceImpl implements DemoService {
@Override
public String sayHello(String name) {
logger.info("Hello " + name + ", request from consumer: " +
RpcContext.getContext().getRemoteAddress());
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
return "Hello " + name + ", response from provider: " +
RpcContext.getContext().getLocalAddress();
}
diff --git
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
index 7a4961b..e563da6 100644
---
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
+++
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
public class MockChannel implements ExchangeChannel {
@@ -93,6 +94,16 @@ public class MockChannel implements ExchangeChannel {
return null;
}
+ @Override
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
+ return null;
+ }
+
public ExchangeHandler getExchangeHandler() {
return null;
}
diff --git
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
index 1afe0d5..ad06cd2 100644
---
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
+++
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
/**
@@ -81,13 +82,23 @@ public class MockedClient implements ExchangeClient {
}
public CompletableFuture<Object> request(Object msg) throws
RemotingException {
- return request(msg, 0);
+ return request(msg, null);
}
public CompletableFuture<Object> request(Object msg, int timeout) throws
RemotingException {
+ return this.request(msg, timeout, null);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object msg, ExecutorService
executor) throws RemotingException {
+ return this.request(msg, 0, executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object msg, int timeout,
ExecutorService executor) throws RemotingException {
this.invoked = msg;
return new CompletableFuture<Object>() {
- public Object get() throws InterruptedException,
ExecutionException {
+ public Object get() throws InterruptedException,
ExecutionException {
return received;
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
index 0e4917d..c0cf131 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
/**
* ExchangeChannel. (API/SPI, Prototype, ThreadSafe)
@@ -33,6 +34,7 @@ public interface ExchangeChannel extends Channel {
* @return response future
* @throws RemotingException
*/
+ @Deprecated
CompletableFuture<Object> request(Object request) throws RemotingException;
/**
@@ -43,9 +45,29 @@ public interface ExchangeChannel extends Channel {
* @return response future
* @throws RemotingException
*/
+ @Deprecated
CompletableFuture<Object> request(Object request, int timeout) throws
RemotingException;
/**
+ * send request.
+ *
+ * @param request
+ * @return response future
+ * @throws RemotingException
+ */
+ CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException;
+
+ /**
+ * send request.
+ *
+ * @param request
+ * @param timeout
+ * @return response future
+ * @throws RemotingException
+ */
+ CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException;
+
+ /**
* get message handler.
*
* @return message handler
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index e13f651..8385f54 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.exchange.support;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
@@ -35,6 +36,7 @@ import java.util.Date;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -63,6 +65,16 @@ public class DefaultFuture extends CompletableFuture<Object>
{
private final long start = System.currentTimeMillis();
private volatile long sent;
+ private ExecutorService executor;
+
+ public ExecutorService getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(ExecutorService executor) {
+ this.executor = executor;
+ }
+
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
@@ -83,8 +95,9 @@ public class DefaultFuture extends CompletableFuture<Object> {
* @param timeout timeout
* @return a new DefaultFuture
*/
- public static DefaultFuture newFuture(Channel channel, Request request,
int timeout) {
+ public static DefaultFuture newFuture(Channel channel, Request request,
int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request,
timeout);
+ future.setExecutor(executor);
// timeout check
timeoutCheck(future);
return future;
@@ -165,7 +178,6 @@ public class DefaultFuture extends
CompletableFuture<Object> {
this.cancel(true);
}
-
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
@@ -177,6 +189,15 @@ public class DefaultFuture extends
CompletableFuture<Object> {
} else {
this.completeExceptionally(new RemotingException(channel,
res.getErrorMessage()));
}
+
+ // the result is returning, but the caller thread may still waiting
+ // to avoid endless waiting for whatever reason, notify caller thread
to return.
+ if (executor != null && executor instanceof ThreadlessExecutor) {
+ ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
+ if (threadlessExecutor.isWaiting()) {
+ threadlessExecutor.notifyReturn();
+ }
+ }
}
private long getId() {
@@ -240,14 +261,17 @@ public class DefaultFuture extends
CompletableFuture<Object> {
if (future.isDone()) {
return;
}
- // create exception response.
- Response timeoutResponse = new Response(future.getId());
- // set timeout status.
- timeoutResponse.setStatus(future.isSent() ?
Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
- timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
- // handle response.
- DefaultFuture.received(future.getChannel(), timeoutResponse);
-
+ if (future.getExecutor() != null) {
+ future.getExecutor().execute(() -> {
+ // create exception response.
+ Response timeoutResponse = new Response(future.getId());
+ // set timeout status.
+ timeoutResponse.setStatus(future.isSent() ?
Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
+
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
+ // handle response.
+ DefaultFuture.received(future.getChannel(),
timeoutResponse);
+ });
+ }
}
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
index 1666bbf..e60e24f 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
@@ -32,6 +32,7 @@ import
org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
/**
* ExchangeReceiver
@@ -98,11 +99,21 @@ final class HeaderExchangeChannel implements
ExchangeChannel {
@Override
public CompletableFuture<Object> request(Object request) throws
RemotingException {
- return request(request,
channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,
Constants.DEFAULT_TIMEOUT));
+ return request(request, null);
}
@Override
public CompletableFuture<Object> request(Object request, int timeout)
throws RemotingException {
+ return request(request, timeout, null);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
+ return request(request,
channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,
Constants.DEFAULT_TIMEOUT), executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed
to send request " + request + ", cause: The channel " + this + " is closed!");
}
@@ -111,7 +122,7 @@ final class HeaderExchangeChannel implements
ExchangeChannel {
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
- DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
+ DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout,
executor);
try {
channel.send(req);
} catch (RemotingException e) {
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 5b99079..b5720f4 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -31,6 +31,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.utils.UrlUtils.getHeartbeat;
@@ -82,6 +83,16 @@ public class HeaderExchangeClient implements ExchangeClient {
}
@Override
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
+ return channel.request(request, executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
+ return channel.request(request, timeout, executor);
+ }
+
+ @Override
public ChannelHandler getChannelHandler() {
return channel.getChannelHandler();
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index 2afdc4d..5408759 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -22,7 +22,7 @@ import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.store.DataStore;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
@@ -46,6 +46,7 @@ public abstract class AbstractClient extends AbstractEndpoint
implements Client
private final Lock connectLock = new ReentrantLock();
private final boolean needReconnect;
protected volatile ExecutorService executor;
+ private ExecutorRepository executorRepository =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
public AbstractClient(URL url, ChannelHandler handler) throws
RemotingException {
super(url, handler);
@@ -80,11 +81,7 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
"Failed to start " + getClass().getSimpleName() + " " +
NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() +
", cause: " + t.getMessage(), t);
}
-
- executor = (ExecutorService)
ExtensionLoader.getExtensionLoader(DataStore.class)
- .getDefaultExtension().get(Constants.CONSUMER_SIDE,
Integer.toString(url.getPort()));
- ExtensionLoader.getExtensionLoader(DataStore.class)
- .getDefaultExtension().remove(Constants.CONSUMER_SIDE,
Integer.toString(url.getPort()));
+ executor = executorRepository.createExecutorIfAbsent(url);
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler
handler) {
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index 2020fee..aac25f5 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -21,7 +21,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.store.DataStore;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
@@ -32,7 +32,6 @@ import org.apache.dubbo.remoting.Server;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
/**
* AbstractServer
@@ -47,6 +46,8 @@ public abstract class AbstractServer extends AbstractEndpoint
implements Server
private int accepts;
private int idleTimeout;
+ private ExecutorRepository executorRepository =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+
public AbstractServer(URL url, ChannelHandler handler) throws
RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
@@ -68,9 +69,7 @@ public abstract class AbstractServer extends AbstractEndpoint
implements Server
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " +
t.getMessage(), t);
}
- //fixme replace this with better method
- DataStore dataStore =
ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
- executor = (ExecutorService)
dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY,
Integer.toString(url.getPort()));
+ executor = executorRepository.createExecutorIfAbsent(url);
}
protected abstract void doOpen() throws Throwable;
@@ -102,30 +101,7 @@ public abstract class AbstractServer extends
AbstractEndpoint implements Server
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
- try {
- if (url.hasParameter(Constants.THREADS_KEY)
- && executor instanceof ThreadPoolExecutor &&
!executor.isShutdown()) {
- ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)
executor;
- int threads = url.getParameter(Constants.THREADS_KEY, 0);
- int max = threadPoolExecutor.getMaximumPoolSize();
- int core = threadPoolExecutor.getCorePoolSize();
- if (threads > 0 && (threads != max || threads != core)) {
- if (threads < core) {
- threadPoolExecutor.setCorePoolSize(threads);
- if (core == max) {
- threadPoolExecutor.setMaximumPoolSize(threads);
- }
- } else {
- threadPoolExecutor.setMaximumPoolSize(threads);
- if (core == max) {
- threadPoolExecutor.setCorePoolSize(threads);
- }
- }
- }
- }
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- }
+ executorRepository.updateThreadpool(url, executor);
super.setUrl(getUrl().addParameters(url.getParameters()));
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
index 072f998..a709a45 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
@@ -16,30 +16,25 @@
*/
package org.apache.dubbo.remoting.transport.dispatcher;
-import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.store.DataStore;
-import org.apache.dubbo.common.threadpool.ThreadPool;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.apache.dubbo.remoting.transport.ChannelHandlerDelegate;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
public class WrappedChannelHandler implements ChannelHandlerDelegate {
protected static final Logger logger =
LoggerFactory.getLogger(WrappedChannelHandler.class);
- protected static final ExecutorService SHARED_EXECUTOR =
Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler",
true));
-
- protected final ExecutorService executor;
-
protected final ChannelHandler handler;
protected final URL url;
@@ -47,24 +42,10 @@ public class WrappedChannelHandler implements
ChannelHandlerDelegate {
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
- executor = (ExecutorService)
ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
-
- String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
- if
(Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY)))
{
- componentKey = Constants.CONSUMER_SIDE;
- }
- DataStore dataStore =
ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
- dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
public void close() {
- try {
- if (executor != null) {
- executor.shutdown();
- }
- } catch (Throwable t) {
- logger.warn("fail to destroy thread pool of server: " +
t.getMessage(), t);
- }
+
}
@Override
@@ -92,8 +73,16 @@ public class WrappedChannelHandler implements
ChannelHandlerDelegate {
handler.caught(channel, exception);
}
- public ExecutorService getExecutor() {
- return executor;
+ protected void sendFeedback(Channel channel, Request request, Throwable t)
throws RemotingException {
+ if (request.isTwoWay()) {
+ String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ + ") thread pool is exhausted, detail msg:" +
t.getMessage();
+ Response response = new Response(request.getId(),
request.getVersion());
+ response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
+ response.setErrorMessage(msg);
+ channel.send(response);
+ return;
+ }
}
@Override
@@ -109,12 +98,46 @@ public class WrappedChannelHandler implements
ChannelHandlerDelegate {
return url;
}
- public ExecutorService getExecutorService() {
- ExecutorService cexecutor = executor;
- if (cexecutor == null || cexecutor.isShutdown()) {
- cexecutor = SHARED_EXECUTOR;
+ /**
+ * Currently, this method is mainly customized to facilitate the thread
model on consumer side.
+ * 1. Use ThreadlessExecutor, aka., delegate callback directly to the
thread initiating the call.
+ * 2. Use shared executor to execute the callback.
+ *
+ * @param msg
+ * @return
+ */
+ public ExecutorService getPreferredExecutorService(Object msg) {
+ if (msg instanceof Response) {
+ Response response = (Response) msg;
+ DefaultFuture responseFuture =
DefaultFuture.getFuture(response.getId());
+ // a typical scenario is the response returned after timeout, the
timeout response may has completed the future
+ if (responseFuture == null) {
+ return getSharedExecutorService();
+ } else {
+ ExecutorService executor = responseFuture.getExecutor();
+ if (executor == null || executor.isShutdown()) {
+ executor = getSharedExecutorService();
+ }
+ return executor;
+ }
+ } else {
+ return getSharedExecutorService();
}
- return cexecutor;
}
+ /**
+ * get the shared executor for current Server or Client
+ *
+ * @return
+ */
+ public ExecutorService getSharedExecutorService() {
+ return
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().createExecutorIfAbsent(url);
+ }
+
+ @Deprecated
+ public ExecutorService getExecutorService() {
+ return getSharedExecutorService();
+ }
+
+
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
index 88431fd..25158d0 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.ExecutionException;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
import
org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.ChannelState;
import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
@@ -38,9 +37,9 @@ public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void connected(Channel channel) throws RemotingException {
- ExecutorService cexecutor = getExecutorService();
+ ExecutorService executor = getExecutorService();
try {
- cexecutor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.CONNECTED));
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass()
+ " error when process connected event .", t);
}
@@ -48,9 +47,9 @@ public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void disconnected(Channel channel) throws RemotingException {
- ExecutorService cexecutor = getExecutorService();
+ ExecutorService executor = getExecutorService();
try {
- cexecutor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.DISCONNECTED));
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel,
getClass() + " error when process disconnected event .", t);
}
@@ -58,22 +57,13 @@ public class AllChannelHandler extends
WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws
RemotingException {
- ExecutorService cexecutor = getExecutorService();
+ ExecutorService executor = getPreferredExecutorService(message);
try {
- cexecutor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
} catch (Throwable t) {
- //TODO A temporary solution to the problem that the exception
information can not be sent to the opposite end after the thread pool is full.
Need a refactoring
- //fix The thread pool is full, refuses to call, does not return,
and causes the consumer to wait for time out
if(message instanceof Request && t instanceof
RejectedExecutionException){
- Request request = (Request)message;
- if(request.isTwoWay()){
- String msg = "Server side(" + url.getIp() + ","
+ url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
- Response response = new
Response(request.getId(), request.getVersion());
-
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
- response.setErrorMessage(msg);
- channel.send(response);
- return;
- }
+ sendFeedback(channel, (Request) message, t);
+ return;
}
throw new ExecutionException(message, channel, getClass() + "
error when process received event .", t);
}
@@ -81,9 +71,9 @@ public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void caught(Channel channel, Throwable exception) throws
RemotingException {
- ExecutorService cexecutor = getExecutorService();
+ ExecutorService executor = getExecutorService();
try {
- cexecutor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.CAUGHT, exception));
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() +
" error when process caught event .", t);
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
index a40d432..6d5f06d 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
@@ -25,7 +25,6 @@ import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.ExecutionException;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
import
org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.ChannelState;
import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
@@ -75,21 +74,13 @@ public class ConnectionOrderedChannelHandler extends
WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws
RemotingException {
- ExecutorService cexecutor = getExecutorService();
+ ExecutorService executor = getPreferredExecutorService(message);
try {
- cexecutor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
} catch (Throwable t) {
- //fix, reject exception can not be sent to consumer because thread
pool is full, resulting in consumers waiting till timeout.
if (message instanceof Request && t instanceof
RejectedExecutionException) {
- Request request = (Request) message;
- if (request.isTwoWay()) {
- String msg = "Server side(" + url.getIp() + "," +
url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
- Response response = new Response(request.getId(),
request.getVersion());
-
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
- response.setErrorMessage(msg);
- channel.send(response);
- return;
- }
+ sendFeedback(channel, (Request) message, t);
+ return;
}
throw new ExecutionException(message, channel, getClass() + "
error when process received event .", t);
}
@@ -97,9 +88,9 @@ public class ConnectionOrderedChannelHandler extends
WrappedChannelHandler {
@Override
public void caught(Channel channel, Throwable exception) throws
RemotingException {
- ExecutorService cexecutor = getExecutorService();
+ ExecutorService executor = getExecutorService();
try {
- cexecutor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.CAUGHT, exception));
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() +
" error when process caught event .", t);
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectChannelHandler.java
similarity index 66%
copy from
dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
copy to
dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectChannelHandler.java
index 3e756c7..9bb8d1f 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectChannelHandler.java
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.transport.dispatcher.message;
+package org.apache.dubbo.remoting.transport.dispatcher.direct;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.ExecutionException;
@@ -27,19 +28,23 @@ import
org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
import java.util.concurrent.ExecutorService;
-public class MessageOnlyChannelHandler extends WrappedChannelHandler {
+public class DirectChannelHandler extends WrappedChannelHandler {
- public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
+ public DirectChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws
RemotingException {
- ExecutorService cexecutor = getExecutorService();
- try {
- cexecutor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
- } catch (Throwable t) {
- throw new ExecutionException(message, channel, getClass() + "
error when process received event .", t);
+ ExecutorService executor = getPreferredExecutorService(message);
+ if (executor instanceof ThreadlessExecutor) {
+ try {
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
+ } catch (Throwable t) {
+ throw new ExecutionException(message, channel, getClass() + "
error when process received event .", t);
+ }
+ } else {
+ handler.received(channel, message);
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
index f18065d..aaed4e7 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/direct/DirectDispatcher.java
@@ -29,7 +29,7 @@ public class DirectDispatcher implements Dispatcher {
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
- return handler;
+ return new DirectChannelHandler(handler, url);
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
index ac588f3..761e26c 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
@@ -17,12 +17,12 @@
package org.apache.dubbo.remoting.transport.dispatcher.execution;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.ExecutionException;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
import
org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.ChannelState;
import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
@@ -42,28 +42,22 @@ public class ExecutionChannelHandler extends
WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws
RemotingException {
- ExecutorService cexecutor = getExecutorService();
+ ExecutorService executor = getPreferredExecutorService(message);
+
if (message instanceof Request) {
try {
- cexecutor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
} catch (Throwable t) {
// FIXME: when the thread pool is full,
SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
// therefore the consumer side has to wait until gets timeout.
This is a temporary solution to prevent
// this scenario from happening, but a better solution should
be considered later.
if (t instanceof RejectedExecutionException) {
- Request request = (Request) message;
- if (request.isTwoWay()) {
- String msg = "Server side(" + url.getIp() + "," +
url.getPort()
- + ") thread pool is exhausted, detail msg:" +
t.getMessage();
- Response response = new Response(request.getId(),
request.getVersion());
-
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
- response.setErrorMessage(msg);
- channel.send(response);
- return;
- }
+ sendFeedback(channel, (Request) message, t);
}
throw new ExecutionException(message, channel, getClass() + "
error when process received event.", t);
}
+ } else if (executor instanceof ThreadlessExecutor) {
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
} else {
handler.received(channel, message);
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
index 3e756c7..2cd20cc 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/message/MessageOnlyChannelHandler.java
@@ -35,9 +35,9 @@ public class MessageOnlyChannelHandler extends
WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws
RemotingException {
- ExecutorService cexecutor = getExecutorService();
+ ExecutorService executor = getPreferredExecutorService(message);
try {
- cexecutor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
+ executor.execute(new ChannelEventRunnable(channel, handler,
ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + "
error when process received event .", t);
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 2dd4005..0f19d15 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -95,7 +95,7 @@ public class DefaultFutureTest {
// timeout after 5 seconds.
Channel channel = new MockedChannel();
Request request = new Request(10);
- DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000);
+ DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000,
null);
//mark the future is sent
DefaultFuture.sent(channel, request);
while (!f.isDone()) {
@@ -119,7 +119,7 @@ public class DefaultFutureTest {
private DefaultFuture defaultFuture(int timeout) {
Channel channel = new MockedChannel();
Request request = new Request(index.getAndIncrement());
- return DefaultFuture.newFuture(channel, request, timeout);
+ return DefaultFuture.newFuture(channel, request, timeout, null);
}
}
\ No newline at end of file
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
index 92739bf..2affe7d 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
@@ -17,23 +17,21 @@
package org.apache.dubbo.remoting.exchange.support.header;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-
-import org.apache.dubbo.remoting.Channel;
-import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.exchange.Request;
-
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.util.List;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class HeaderExchangeChannelTest {
@@ -193,7 +191,7 @@ public class HeaderExchangeChannelTest {
public void closeWithTimeoutTest02() {
Assertions.assertFalse(channel.isClosed());
Request request = new Request();
- DefaultFuture.newFuture(channel, request, 100);
+ DefaultFuture.newFuture(channel, request, 100, null);
header.close(100);
//return directly
header.close(1000);
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 9546e2a..067f896 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -18,10 +18,14 @@ package org.apache.dubbo.rpc;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
/**
@@ -47,6 +51,7 @@ public class AsyncRpcResult extends AbstractResult {
*/
private RpcContext storedContext;
private RpcContext storedServerContext;
+ private Executor executor;
private Invocation invocation;
@@ -108,34 +113,46 @@ public class AsyncRpcResult extends AbstractResult {
return new AppResponse();
}
+ /**
+ * This method will always return after a maximum 'timeout' waiting:
+ * 1. if value returns before timeout, return normally.
+ * 2. if no value returns after timeout, throw TimeoutException.
+ *
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ @Override
+ public Result get() throws InterruptedException, ExecutionException {
+ if (executor != null) {
+ ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)
executor;
+ threadlessExecutor.waitAndDrain();
+ }
+ return super.get();
+ }
+
+ @Override
+ public Result get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {
+ return this.get();
+ }
+
@Override
public Object recreate() throws Throwable {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
- AppResponse appResponse = new AppResponse();
- CompletableFuture<Object> future = new CompletableFuture<>();
- appResponse.setValue(future);
- this.whenComplete((result, t) -> {
- if (t != null) {
- if (t instanceof CompletionException) {
- t = t.getCause();
- }
- future.completeExceptionally(t);
- } else {
- if (result.hasException()) {
- future.completeExceptionally(result.getException());
- } else {
- future.complete(result.getValue());
- }
- }
- });
- return appResponse.recreate();
+ return RpcContext.getContext().getFuture();
} else if (this.isDone()) {
return this.get().recreate();
}
return (new AppResponse()).recreate();
}
+ /**
+ * register a callback which will be executed under the same context when
the RPC call returns.
+ *
+ * @param fn
+ * @return
+ */
public Result thenApplyWithContext(Function<Result, Result> fn) {
CompletableFuture<Result> future =
this.thenApply(fn.compose(beforeContext).andThen(afterContext));
AsyncRpcResult nextAsyncRpcResult = new AsyncRpcResult(this);
@@ -195,6 +212,14 @@ public class AsyncRpcResult extends AbstractResult {
return invocation;
}
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
/**
* tmp context to use when the thread switch to Dubbo thread.
*/
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureAdapter.java
similarity index 86%
rename from
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
rename to
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureAdapter.java
index 03954d1..d50f91e 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/FutureAdapter.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/FutureAdapter.java
@@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.rpc.protocol.dubbo;
-
-import org.apache.dubbo.rpc.AppResponse;
-import org.apache.dubbo.rpc.RpcException;
+package org.apache.dubbo.rpc;
+import java.lang.ref.SoftReference;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -32,7 +30,10 @@ public class FutureAdapter<V> extends CompletableFuture<V> {
private CompletableFuture<AppResponse> appResponseFuture;
- public FutureAdapter(CompletableFuture<AppResponse> future) {
+ private SoftReference<Invocation> invocationSoftReference;
+
+ public FutureAdapter(CompletableFuture<AppResponse> future, Invocation
invocation) {
+ this.invocationSoftReference = new SoftReference<>(invocation);
this.appResponseFuture = future;
future.whenComplete((appResponse, t) -> {
if (t != null) {
@@ -53,6 +54,10 @@ public class FutureAdapter<V> extends CompletableFuture<V> {
// TODO figure out the meaning of cancel in DefaultFuture.
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
+// Invocation invocation = invocationSoftReference.get();
+// if (invocation != null) {
+// invocation.getInvoker().invoke(cancel);
+// }
return appResponseFuture.cancel(mayInterruptIfRunning);
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 58cb410..ad5d7c9 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -18,13 +18,18 @@ package org.apache.dubbo.rpc.protocol;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.FutureAdapter;
import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.InvokeMode;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
@@ -36,6 +41,7 @@ import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -151,26 +157,38 @@ public abstract class AbstractInvoker<T> implements
Invoker<T> {
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
+ AsyncRpcResult asyncResult;
try {
- return doInvoke(invocation);
+ asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
- return AsyncRpcResult.newDefaultAsyncResult(null, e,
invocation);
+ asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e,
invocation);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
- return AsyncRpcResult.newDefaultAsyncResult(null, te,
invocation);
+ asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te,
invocation);
}
} catch (RpcException e) {
if (e.isBiz()) {
- return AsyncRpcResult.newDefaultAsyncResult(null, e,
invocation);
+ asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e,
invocation);
} else {
throw e;
}
} catch (Throwable e) {
- return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
+ asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e,
invocation);
+ }
+ RpcContext.getContext().setFuture(new FutureAdapter(asyncResult, inv));
+ return asyncResult;
+ }
+
+ protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
+ ExecutorService sharedExecutor =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().createExecutorIfAbsent(url);
+ if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
+ return new ThreadlessExecutor(sharedExecutor);
+ } else {
+ return sharedExecutor;
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
index e6d3f84..787097e 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
@@ -72,6 +72,7 @@ public class DecodeableRpcResult extends AppResponse
implements Codec, Decodeabl
@Override
public Object decode(Channel channel, InputStream input) throws
IOException {
+ log.debug("Decoding in thread -- " + Thread.currentThread().getName());
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(),
serializationType)
.deserialize(channel.getUrl(), input);
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 0f77d65..96925a9 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -36,6 +36,7 @@ import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -88,7 +89,9 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
- CompletableFuture<Object> responseFuture =
currentClient.request(inv, timeout);
+ ExecutorService executor = getCallbackExecutor(getUrl(), inv);
+ asyncRpcResult.setExecutor(executor);
+ CompletableFuture<Object> responseFuture =
currentClient.request(inv, timeout, executor);
responseFuture.whenComplete((obj, t) -> {
if (t != null) {
asyncRpcResult.completeExceptionally(t);
@@ -96,7 +99,6 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
asyncRpcResult.complete((AppResponse) obj);
}
});
- RpcContext.getContext().setFuture(new
FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
} catch (TimeoutException e) {
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index c11ffca..4a0a4d3 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -30,6 +30,7 @@ import org.apache.dubbo.remoting.exchange.Exchangers;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -111,6 +112,20 @@ final class LazyConnectExchangeClient implements
ExchangeClient {
return client.request(request, timeout);
}
+ @Override
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
+ warning();
+ initClient();
+ return client.request(request, executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
+ warning();
+ initClient();
+ return client.request(request, timeout, executor);
+ }
+
/**
* If {@link #REQUEST_WITH_WARNING_KEY} is configured, then warn once
every 5000 invocations.
*/
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index cb589e2..14bff74 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -77,6 +78,16 @@ final class ReferenceCountExchangeClient implements
ExchangeClient {
}
@Override
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
+ return client.request(request, executor);
+ }
+
+ @Override
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
+ return client.request(request, timeout, executor);
+ }
+
+ @Override
public boolean isConnected() {
return client.isConnected();
}
diff --git
a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
index 6eb3c6a..9e71d11 100644
---
a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
@@ -26,11 +26,9 @@ import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
-import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -91,7 +89,6 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
CompletableFuture<Object> responseFuture =
currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
- RpcContext.getContext().setFuture(new
FutureAdapter(asyncRpcResult));
return asyncRpcResult;
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION,
e.getMessage(), e);