This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new c96a3a1af2 Provider supports thread pool isolation between services
(#10658)
c96a3a1af2 is described below
commit c96a3a1af2d503ca72c9e3e33681cae8d2a97d28
Author: 灼华 <[email protected]>
AuthorDate: Thu Oct 27 17:23:00 2022 +0800
Provider supports thread pool isolation between services (#10658)
---
.../dubbo/common/constants/CommonConstants.java | 9 +
.../manager/DefaultExecutorRepository.java | 76 ++++++---
.../threadpool/manager/ExecutorRepository.java | 22 +++
.../manager/IsolationExecutorRepository.java | 64 +++++++
.../dubbo/common/utils/NamedThreadFactory.java | 5 +
.../apache/dubbo/config/AbstractServiceConfig.java | 17 ++
.../org/apache/dubbo/config/ApplicationConfig.java | 19 +++
.../dubbo/config/annotation/DubboService.java | 6 +
.../executor/AbstractIsolationExecutorSupport.java | 85 +++++++++
.../dubbo/rpc/executor/DefaultExecutorSupport.java | 26 +--
.../apache/dubbo/rpc/executor/ExecutorSupport.java | 20 +--
.../executor/IsolationExecutorSupportFactory.java | 40 +++++
.../apache/dubbo/rpc/model/ApplicationModel.java | 2 +-
...bo.common.threadpool.manager.ExecutorRepository | 3 +-
.../threadpool/manager/ExecutorRepositoryTest.java | 2 +-
.../definition/ServiceDefinitionBuilderTest.java | 17 ++
.../org/apache/dubbo/config/ServiceConfig.java | 22 ++-
.../dubbo/config/bootstrap/DubboBootstrap.java | 2 +-
.../bootstrap/builders/AbstractServiceBuilder.java | 14 ++
.../config/deploy/DefaultApplicationDeployer.java | 4 +-
.../dubbo/config/deploy/DefaultModuleDeployer.java | 2 +-
.../annotation/ServiceAnnotationPostProcessor.java | 7 +-
.../spring/schema/DubboBeanDefinitionParser.java | 3 +
.../src/main/resources/META-INF/compat/dubbo.xsd | 11 ++
.../src/main/resources/META-INF/dubbo.xsd | 11 ++
.../spring/isolation/api/ApiIsolationTest.java | 189 +++++++++++++++++++++
.../config/spring/isolation/spring/BaseTest.java | 106 ++++++++++++
.../spring/annotation/AnnotationIsolationTest.java | 134 +++++++++++++++
.../annotation/consumer/dubbo/DemoServiceV1.java | 31 ++--
.../annotation/consumer/dubbo/HelloServiceV2.java | 27 ++-
.../annotation/consumer/dubbo/HelloServiceV3.java | 27 ++-
.../annotation/consumer/tri/DemoServiceV1.java | 31 ++--
.../annotation/consumer/tri/HelloServiceV2.java | 27 ++-
.../annotation/consumer/tri/HelloServiceV3.java | 27 ++-
.../annotation/provider/DemoServiceImplV1.java | 28 ++-
.../annotation/provider/HelloServiceImplV2.java | 27 ++-
.../annotation/provider/HelloServiceImplV3.java | 28 ++-
.../spring/support/DemoServiceExecutor.java | 25 +--
.../spring/support/HelloServiceExecutor.java | 26 ++-
.../isolation/spring/xml/XmlIsolationTest.java | 56 ++++++
.../META-INF/isolation/dubbo-consumer.xml | 51 ++++++
.../META-INF/isolation/dubbo-provider.xml | 60 +++++++
.../dubbo/remoting/transport/AbstractClient.java | 3 +-
.../dubbo/remoting/transport/AbstractServer.java | 2 +-
.../dispatcher/WrappedChannelHandler.java | 21 ++-
.../apache/dubbo/rpc/protocol/AbstractInvoker.java | 2 +-
.../dubbo/rpc/protocol/dubbo/DubboCodec.java | 19 ++-
.../dubbo/DubboIsolationExecutorSupport.java | 63 +++++++
.../DubboIsolationExecutorSupportFactory.java} | 25 +--
...bo.rpc.executor.IsolationExecutorSupportFactory | 1 +
.../dubbo/rpc/protocol/injvm/InjvmInvoker.java | 2 +-
.../rpc/protocol/tri/TripleHttp2Protocol.java | 4 +-
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 8 +-
.../rpc/protocol/tri/stream/AbstractStream.java | 6 +-
.../transport/TripleHttp2FrameServerHandler.java | 16 +-
.../transport/TripleIsolationExecutorSupport.java | 58 +++++++
.../TripleIsolationExecutorSupportFactory.java} | 25 +--
...bo.rpc.executor.IsolationExecutorSupportFactory | 1 +
.../dubbo/rpc/protocol/tri/TripleInvokerTest.java | 4 +-
59 files changed, 1353 insertions(+), 296 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 727ee5cc2c..b8709881cd 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -585,6 +585,15 @@ public interface CommonConstants {
*/
String UNLOAD_CLUSTER_RELATED = "unloadClusterRelated";
+ /**
+ * used for thread isolation between services
+ */
+ String SERVICE_EXECUTOR = "service-executor";
+ String EXECUTOR_MANAGEMENT_MODE = "executor-management-mode";
+ String EXECUTOR_MANAGEMENT_MODE_DEFAULT = "default";
+ String EXECUTOR_MANAGEMENT_MODE_ISOLATION = "isolation";
+
+
/**
*
* used in JVMUtil.java ,Control stack print lines, default is 32 lines
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index c337acaf3e..f1d2c7ebd9 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -32,6 +32,8 @@ import org.apache.dubbo.config.ProviderConfig;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.executor.DefaultExecutorSupport;
+import org.apache.dubbo.rpc.executor.ExecutorSupport;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -53,7 +55,7 @@ import static
org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
/**
- * Consider implementing {@code Licycle} to enable executors shutdown when the
process stops.
+ * Consider implementing {@code Lifecycle} to enable executors shutdown when
the process stops.
*/
public class DefaultExecutorRepository implements ExecutorRepository,
ExtensionAccessorAware {
private static final Logger logger =
LoggerFactory.getLogger(DefaultExecutorRepository.class);
@@ -62,13 +64,14 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
private volatile ExecutorService serviceReferExecutor;
- private final ConcurrentMap<String, ConcurrentMap<Integer,
ExecutorService>> data = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, ConcurrentMap<String,
ExecutorService>> data = new ConcurrentHashMap<>();
private final Object LOCK = new Object();
private ExtensionAccessor extensionAccessor;
private final ApplicationModel applicationModel;
private final FrameworkExecutorRepository frameworkExecutorRepository;
+ private ExecutorSupport executorSupport;
private final DataStore dataStore;
@@ -87,31 +90,54 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
@Override
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
String executorKey = getExecutorKey(url);
- Map<Integer, ExecutorService> executors =
data.computeIfAbsent(executorKey, k -> new ConcurrentHashMap<>());
- // Consumer's executor is sharing globally, key=Integer.MAX_VALUE.
Provider's executor is sharing by protocol.
- Integer portKey =
CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE
: url.getPort();
+ Map<String, ExecutorService> executors =
data.computeIfAbsent(executorKey, k -> new ConcurrentHashMap<>());
- String protocol = url.getProtocol();
- if (StringUtils.isEmpty(protocol)) {
- protocol = DEFAULT_PROTOCOL;
- }
+ String executorCacheKey = getExecutorSecondKey(url);
+
+ url = setThreadNameIfAbsent(url, executorCacheKey);
- if (url.getParameter(THREAD_NAME_KEY) == null) {
- url = url.putAttribute(THREAD_NAME_KEY, protocol + "-protocol-" +
portKey);
- }
URL finalUrl = url;
- ExecutorService executor = executors.computeIfAbsent(portKey, k ->
createExecutor(finalUrl));
+ ExecutorService executor = executors.computeIfAbsent(executorCacheKey,
k -> createExecutor(finalUrl));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
- executors.remove(portKey);
+ executors.remove(executorCacheKey);
executor = createExecutor(url);
- executors.put(portKey, executor);
+ executors.put(executorCacheKey, executor);
}
- dataStore.put(executorKey, Integer.toString(portKey), executor);
+ dataStore.put(executorKey, executorCacheKey, executor);
return executor;
}
+ protected URL setThreadNameIfAbsent(URL url, String executorCacheKey) {
+ if (url.getParameter(THREAD_NAME_KEY) == null) {
+ String protocol = url.getProtocol();
+ if (StringUtils.isEmpty(protocol)) {
+ protocol = DEFAULT_PROTOCOL;
+ }
+ url = url.putAttribute(THREAD_NAME_KEY, protocol + "-protocol-" +
executorCacheKey);
+ }
+ return url;
+ }
+
+ private String getExecutorSecondKey(URL url) {
+ if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
+ return getConsumerKey(url);
+ } else {
+ return getProviderKey(url);
+ }
+ }
+
+ private String getConsumerKey(URL url) {
+ // Consumer's executor is sharing globally, key=Integer.MAX_VALUE
+ return String.valueOf(Integer.MAX_VALUE);
+ }
+
+ protected String getProviderKey(URL url) {
+ // Provider's executor is sharing by protocol.
+ return String.valueOf(url.getPort());
+ }
+
/**
* Return the executor key based on the type (internal or biz) of the
current service.
*
@@ -133,13 +159,13 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
return executorKey;
}
- private ExecutorService createExecutor(URL url) {
+ protected ExecutorService createExecutor(URL url) {
return (ExecutorService)
extensionAccessor.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
@Override
public ExecutorService getExecutor(URL url) {
- Map<Integer, ExecutorService> executors =
data.get(getExecutorKey(url));
+ Map<String, ExecutorService> executors = data.get(getExecutorKey(url));
/*
* It's guaranteed that this method is called after {@link
#createExecutorIfAbsent(URL)}, so data should already
@@ -153,10 +179,10 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
}
// Consumer's executor is sharing globally, key=Integer.MAX_VALUE.
Provider's executor is sharing by protocol.
- Integer portKey =
CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE
: url.getPort();
- ExecutorService executor = executors.get(portKey);
+ String executorCacheKey = getExecutorSecondKey(url);
+ ExecutorService executor = executors.get(executorCacheKey);
if (executor != null && (executor.isShutdown() ||
executor.isTerminated())) {
- executors.remove(portKey);
+ executors.remove(executorCacheKey);
// Does not re-create a shutdown executor, use SHARED_EXECUTOR for
downgrade.
executor = null;
logger.info("Executor for " + url + " is shutdown.");
@@ -408,4 +434,12 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
public ExecutorService getMappingRefreshingExecutor() {
return frameworkExecutorRepository.getMappingRefreshingExecutor();
}
+
+ @Override
+ public ExecutorSupport getExecutorSupport(URL url) {
+ if (executorSupport == null) {
+ executorSupport = new DefaultExecutorSupport(url);
+ }
+ return executorSupport;
+ }
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
index fa8026b890..e9e13b77c7 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -17,12 +17,20 @@
package org.apache.dubbo.common.threadpool.manager;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.extension.ExtensionScope;
import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.executor.ExecutorSupport;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
/**
*
*/
@@ -168,4 +176,18 @@ public interface ExecutorRepository {
*/
@Deprecated
ExecutorService getMappingRefreshingExecutor();
+
+ ExecutorSupport getExecutorSupport(URL url);
+
+ static ExecutorRepository getInstance(ApplicationModel applicationModel) {
+ ExtensionLoader<ExecutorRepository> extensionLoader =
applicationModel.getExtensionLoader(ExecutorRepository.class);
+ String mode = getMode(applicationModel);
+ return StringUtils.isNotEmpty(mode) ?
extensionLoader.getExtension(mode) : extensionLoader.getDefaultExtension();
+ }
+
+ static String getMode(ApplicationModel applicationModel) {
+ Optional<ApplicationConfig> optional =
applicationModel.getApplicationConfigManager().getApplication();
+ return
optional.map(ApplicationConfig::getExecutorManagementMode).orElse(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+ }
+
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/IsolationExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/IsolationExecutorRepository.java
new file mode 100644
index 0000000000..5d6701b4d1
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/IsolationExecutorRepository.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.threadpool.manager;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.executor.ExecutorSupport;
+import org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory;
+
+import java.util.concurrent.ExecutorService;
+
+import static
org.apache.dubbo.common.constants.CommonConstants.SERVICE_EXECUTOR;
+import static
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
+
+/**
+ * Thread pool isolation between services, that is, a service has its own
thread pool and not interfere with each other
+ */
+public class IsolationExecutorRepository extends DefaultExecutorRepository {
+
+ public IsolationExecutorRepository(ApplicationModel applicationModel) {
+ super(applicationModel);
+ }
+
+ @Override
+ protected URL setThreadNameIfAbsent(URL url, String executorCacheKey) {
+ if (url.getParameter(THREAD_NAME_KEY) == null) {
+ url = url.putAttribute(THREAD_NAME_KEY, "isolation-" +
executorCacheKey);
+ }
+ return url;
+ }
+
+ @Override
+ protected String getProviderKey(URL url) {
+ return url.getServiceKey();
+ }
+
+ @Override
+ protected ExecutorService createExecutor(URL url) {
+ Object executor = url.getAttributes().get(SERVICE_EXECUTOR);
+ if (executor != null && executor instanceof ExecutorService) {
+ return (ExecutorService) executor;
+ }
+ return super.createExecutor(url);
+ }
+
+ @Override
+ public ExecutorSupport getExecutorSupport(URL url) {
+ return
IsolationExecutorSupportFactory.getIsolationExecutorSupport(url);
+ }
+}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/NamedThreadFactory.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/NamedThreadFactory.java
index 7d887fda34..a7ea031a9b 100755
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/NamedThreadFactory.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/NamedThreadFactory.java
@@ -60,4 +60,9 @@ public class NamedThreadFactory implements ThreadFactory {
public ThreadGroup getThreadGroup() {
return mGroup;
}
+
+ // for test
+ public AtomicInteger getThreadNum() {
+ return mThreadNum;
+ }
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
index e5f00d9a57..de40f45352 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
@@ -24,9 +24,11 @@ import org.apache.dubbo.rpc.model.ModuleModel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Executor;
import static
org.apache.dubbo.common.constants.CommonConstants.EXPORTER_LISTENER_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.EXPORT_ASYNC_KEY;
+import static
org.apache.dubbo.common.constants.CommonConstants.SERVICE_EXECUTOR;
import static
org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY;
import static
org.apache.dubbo.common.constants.ProviderConstants.DEFAULT_PREFER_SERIALIZATION;
@@ -143,6 +145,11 @@ public abstract class AbstractServiceConfig extends
AbstractInterfaceConfig {
@Deprecated
private Boolean exportAsync;
+ /**
+ * used for thread pool isolation between services
+ */
+ private Executor executor;
+
public AbstractServiceConfig() {
}
@@ -359,4 +366,14 @@ public abstract class AbstractServiceConfig extends
AbstractInterfaceConfig {
public void setExportAsync(Boolean exportAsync) {
this.exportAsync = exportAsync;
}
+
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
+ @Parameter(key = SERVICE_EXECUTOR)
+ public Executor getExecutor() {
+ return executor;
+ }
+
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
b/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
index 4e7a601bd1..3abf10682d 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
@@ -21,6 +21,8 @@ import static
org.apache.dubbo.common.constants.CommonConstants.APPLICATION_PROT
import static
org.apache.dubbo.common.constants.CommonConstants.APPLICATION_VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.DUMP_DIRECTORY;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
import static org.apache.dubbo.common.constants.CommonConstants.HOST_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.LIVENESS_PROBE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.METADATA_KEY;
@@ -203,6 +205,11 @@ public class ApplicationConfig extends AbstractConfig {
private Boolean enableEmptyProtection;
+ /**
+ * thread pool management: default/isolation
+ */
+ private String executorManagementMode;
+
public ApplicationConfig() {
}
@@ -233,6 +240,9 @@ public class ApplicationConfig extends AbstractConfig {
hostname = "UNKNOWN";
}
}
+ if (executorManagementMode == null) {
+ executorManagementMode = EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+ }
}
@Parameter(key = APPLICATION_KEY, required = true)
@@ -575,6 +585,15 @@ public class ApplicationConfig extends AbstractConfig {
this.startupProbe = startupProbe;
}
+ public void setExecutorManagementMode(String executorManagementMode) {
+ this.executorManagementMode = executorManagementMode;
+ }
+
+ @Parameter(key = EXECUTOR_MANAGEMENT_MODE)
+ public String getExecutorManagementMode() {
+ return executorManagementMode;
+ }
+
@Override
public void refresh() {
super.refresh();
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
index acbd341696..c13c6c97e9 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
@@ -317,4 +317,10 @@ public @interface DubboService {
* Weather the service is export asynchronously
*/
boolean exportAsync() default false;
+
+ /**
+ * bean name of service executor(thread pool), used for thread pool
isolation between services
+ * @return
+ */
+ String executor() default "";
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
new file mode 100644
index 0000000000..1140fee973
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.executor;
+
+import org.apache.dubbo.common.ServiceKey;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.resource.GlobalResourcesRepository;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+public abstract class AbstractIsolationExecutorSupport implements
ExecutorSupport {
+ private final URL url;
+ private final ExecutorRepository executorRepository;
+ private final Map<String, Executor> executorMap;
+
+ public AbstractIsolationExecutorSupport(URL url) {
+ this.url = url;
+ this.executorRepository =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
+ this.executorMap = new HashMap<>();
+
GlobalResourcesRepository.getInstance().registerDisposable(this::destroy);
+ }
+
+ public Executor getExecutor(Object data) {
+
+ ServiceKey serviceKey = getServiceKey(data);
+ if (!isValid(serviceKey)) {
+ return null;
+ }
+ String interfaceName = serviceKey.getInterfaceName();
+ String version = serviceKey.getVersion();
+ String group = serviceKey.getGroup();
+ String cachedKey = URL.buildKey(interfaceName, group, version);
+ if (executorMap.containsKey(cachedKey)) {
+ return executorMap.get(cachedKey);
+ }
+
+ synchronized (this) {
+ if (executorMap.containsKey(cachedKey)) {
+ return executorMap.get(cachedKey);
+ }
+ Map<String, String> parameters = url.getParameters();
+ parameters.put(GROUP_KEY, group);
+ parameters.put(INTERFACE_KEY, interfaceName);
+ parameters.put(VERSION_KEY, version);
+ ServiceConfigURL tmpURL = new ServiceConfigURL(url.getProtocol(),
url.getHost(), url.getPort(), interfaceName, parameters);
+ ExecutorService executor = executorRepository.getExecutor(tmpURL);
+ executorMap.put(cachedKey, executor);
+ return executor;
+ }
+ }
+
+ public synchronized void destroy() {
+ executorMap.clear();
+ }
+
+ private boolean isValid(ServiceKey serviceKey) {
+ return serviceKey != null &&
StringUtils.isNotEmpty(serviceKey.getInterfaceName());
+ }
+
+ protected abstract ServiceKey getServiceKey(Object data);
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
similarity index 60%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
index b9496c0c1a..68a670dd88 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/DefaultExecutorSupport.java
@@ -14,24 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dubbo.rpc.executor;
-package org.apache.dubbo.rpc.protocol.tri.stream;
-
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import java.util.concurrent.Executor;
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
+public class DefaultExecutorSupport implements ExecutorSupport {
+ private ExecutorRepository executorRepository;
+ private URL url;
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
+ public DefaultExecutorSupport(URL url) {
+ this.url = url;
+ this.executorRepository =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
+ }
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ public Executor getExecutor(Object data) {
+ return executorRepository.getExecutor(url);
}
+
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/ExecutorSupport.java
similarity index 60%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/ExecutorSupport.java
index b9496c0c1a..c06a802ac2 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/ExecutorSupport.java
@@ -14,24 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.dubbo.rpc.protocol.tri.stream;
-
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+package org.apache.dubbo.rpc.executor;
import java.util.concurrent.Executor;
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
- }
+public interface ExecutorSupport {
+ Executor getExecutor(Object data);
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/IsolationExecutorSupportFactory.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/IsolationExecutorSupportFactory.java
new file mode 100644
index 0000000000..b610e9ac93
--- /dev/null
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/IsolationExecutorSupportFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.executor;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Adaptive;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
+
+@SPI
+public interface IsolationExecutorSupportFactory {
+
+ @Adaptive(PROTOCOL_KEY)
+ ExecutorSupport createIsolationExecutorSupport(URL url);
+
+ static ExecutorSupport getIsolationExecutorSupport(URL url) {
+ ApplicationModel applicationModel = url.getOrDefaultApplicationModel();
+ ExtensionLoader<IsolationExecutorSupportFactory> extensionLoader =
applicationModel.getExtensionLoader(IsolationExecutorSupportFactory.class);
+ IsolationExecutorSupportFactory factory =
extensionLoader.getAdaptiveExtension();
+ return factory.createIsolationExecutorSupport(url);
+ }
+
+}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java
index 446d3d16fc..a2cd0b84d9 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java
@@ -313,7 +313,7 @@ public class ApplicationModel extends ScopeModel {
}
public ExecutorRepository getApplicationExecutorRepository() {
- return
this.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ return ExecutorRepository.getInstance(this);
}
public ApplicationConfig getCurrentConfig() {
diff --git
a/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
index 44199b0219..baec5ac5df 100644
---
a/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
+++
b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
@@ -1 +1,2 @@
-default=org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository
\ No newline at end of file
+default=org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository
+isolation=org.apache.dubbo.common.threadpool.manager.IsolationExecutorRepository
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
index 657d21eaeb..91a92f6cb8 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
@@ -36,7 +36,7 @@ public class ExecutorRepositoryTest {
@BeforeEach
public void setup() {
applicationModel = FrameworkModel.defaultModel().newApplication();
- executorRepository =
applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ executorRepository = ExecutorRepository.getInstance(applicationModel);
}
@AfterEach
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/metadata/definition/ServiceDefinitionBuilderTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/metadata/definition/ServiceDefinitionBuilderTest.java
index afdead8f56..6d59887d18 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/metadata/definition/ServiceDefinitionBuilderTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/metadata/definition/ServiceDefinitionBuilderTest.java
@@ -22,7 +22,10 @@ import
org.apache.dubbo.metadata.definition.model.TypeDefinition;
import org.apache.dubbo.metadata.definition.service.ComplexObject;
import org.apache.dubbo.metadata.definition.service.DemoService;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -33,6 +36,20 @@ import java.util.List;
*/
public class ServiceDefinitionBuilderTest {
+
+ private static FrameworkModel frameworkModel;
+
+ @BeforeAll
+ public static void setup() {
+ frameworkModel = new FrameworkModel();
+ TypeDefinitionBuilder.initBuilders(frameworkModel);
+ }
+
+ @AfterAll
+ public static void clear() {
+ frameworkModel.destroy();
+ }
+
@Test
public void testBuilderComplexObject() {
FullServiceDefinition fullServiceDefinition =
ServiceDefinitionBuilder.buildFullDefinition(DemoService.class);
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 054f67949b..77405a5b78 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -64,11 +64,13 @@ import static
org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static
org.apache.dubbo.common.constants.CommonConstants.DUBBO_IP_TO_BIND;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_ISOLATION;
import static
org.apache.dubbo.common.constants.CommonConstants.LOCALHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY;
+import static
org.apache.dubbo.common.constants.CommonConstants.SERVICE_EXECUTOR;
import static
org.apache.dubbo.common.constants.CommonConstants.SERVICE_NAME_MAPPING_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.CONFIG_NO_METHOD_FOUND;
@@ -244,7 +246,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
}
protected void doDelayExport() {
-
getScopeModel().getDefaultExtension(ExecutorRepository.class).getServiceExportExecutor()
+
ExecutorRepository.getInstance(getScopeModel().getApplicationModel()).getServiceExportExecutor()
.schedule(() -> {
try {
doExport();
@@ -419,9 +421,27 @@ public class ServiceConfig<T> extends ServiceConfigBase<T>
{
URL url = buildUrl(protocolConfig, map);
+ processServiceExecutor(url);
+
exportUrl(url, registryURLs);
}
+ private void processServiceExecutor(URL url) {
+ if (getExecutor() != null) {
+ String mode = application.getExecutorManagementMode();
+ if (!EXECUTOR_MANAGEMENT_MODE_ISOLATION.equals(mode)) {
+ logger.warn("The current executor management mode is " + mode +
+ ", the configured service executor cannot take effect
unless the mode is configured as " + EXECUTOR_MANAGEMENT_MODE_ISOLATION);
+ return;
+ }
+ /**
+ * Because executor is not a string type, it cannot be attached to
the url parameter, so it is added to URL#attributes
+ * and obtained it in IsolationExecutorRepository#createExecutor
method
+ */
+ url.getAttributes().put(SERVICE_EXECUTOR, getExecutor());
+ }
+ }
+
private Map<String, String> buildAttributes(ProtocolConfig protocolConfig)
{
Map<String, String> map = new HashMap<String, String>();
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index 368de9ed33..ea88fd99d7 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -162,7 +162,7 @@ public final class DubboBootstrap {
configManager = applicationModel.getApplicationConfigManager();
environment = applicationModel.getModelEnvironment();
- executorRepository =
applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ executorRepository = ExecutorRepository.getInstance(applicationModel);
applicationDeployer = applicationModel.getDeployer();
// listen deploy events
applicationDeployer.addDeployListener(new
DeployListenerAdapter<ApplicationModel>() {
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/AbstractServiceBuilder.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/AbstractServiceBuilder.java
index c3ddc76e66..f1f4e4fd66 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/AbstractServiceBuilder.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/AbstractServiceBuilder.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.config.ProtocolConfig;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executor;
/**
* AbstractBuilder
@@ -107,6 +108,11 @@ public abstract class AbstractServiceBuilder<T extends
AbstractServiceConfig, B
*/
private String serialization;
+ /**
+ * used for thread pool isolation between services
+ */
+ private Executor executor;
+
/**
* The prefer serialization type
*/
@@ -221,6 +227,11 @@ public abstract class AbstractServiceBuilder<T extends
AbstractServiceConfig, B
return getThis();
}
+ public B executor(Executor executor) {
+ this.executor = executor;
+ return getThis();
+ }
+
/**
* The prefer serialization type
*
@@ -284,6 +295,9 @@ public abstract class AbstractServiceBuilder<T extends
AbstractServiceConfig, B
if (!StringUtils.isEmpty(serialization)) {
instance.setSerialization(serialization);
}
+ if (executor != null) {
+ instance.setExecutor(executor);
+ }
if (StringUtils.isNotBlank(preferSerialization)) {
instance.setPreferSerialization(preferSerialization);
}
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
index edbfc0da78..756e726280 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
@@ -127,7 +127,7 @@ public class DefaultApplicationDeployer extends
AbstractDeployer<ApplicationMode
referenceCache = new CompositeReferenceCache(applicationModel);
frameworkExecutorRepository =
applicationModel.getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class);
- executorRepository =
getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ executorRepository = ExecutorRepository.getInstance(applicationModel);
dubboShutdownHook = new DubboShutdownHook(applicationModel);
// load spi listener
@@ -1072,7 +1072,7 @@ public class DefaultApplicationDeployer extends
AbstractDeployer<ApplicationMode
// shutdown export/refer executor
executorRepository.shutdownServiceExportExecutor();
executorRepository.shutdownServiceReferExecutor();
-
getExtensionLoader(ExecutorRepository.class).getDefaultExtension().destroyAll();
+ ExecutorRepository.getInstance(applicationModel).destroyAll();
}
private void destroyRegistries() {
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
index f692dcd11b..3004b6b005 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
@@ -87,7 +87,7 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
this.moduleModel = moduleModel;
configManager = moduleModel.getConfigManager();
frameworkExecutorRepository =
moduleModel.getApplicationModel().getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class);
- executorRepository =
moduleModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ executorRepository =
ExecutorRepository.getInstance(moduleModel.getApplicationModel());
referenceCache = SimpleReferenceCache.newCache();
applicationDeployer = DefaultApplicationDeployer.get(moduleModel);
diff --git
a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ServiceAnnotationPostProcessor.java
b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ServiceAnnotationPostProcessor.java
index 1a490aabf3..95dc4b5457 100644
---
a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ServiceAnnotationPostProcessor.java
+++
b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ServiceAnnotationPostProcessor.java
@@ -429,7 +429,7 @@ public class ServiceAnnotationPostProcessor implements
BeanDefinitionRegistryPos
MutablePropertyValues propertyValues =
beanDefinition.getPropertyValues();
String[] ignoreAttributeNames = of("provider", "monitor",
"application", "module", "registry", "protocol",
- "methods", "interfaceName", "parameters");
+ "methods", "interfaceName", "parameters", "executor");
propertyValues.addPropertyValues(new
AnnotationPropertyValuesAdapter(serviceAnnotationAttributes, environment,
ignoreAttributeNames));
@@ -480,6 +480,11 @@ public class ServiceAnnotationPostProcessor implements
BeanDefinitionRegistryPos
addPropertyReference(builder, "module", moduleConfigId);
}
+ String executorBeanName = (String)
serviceAnnotationAttributes.get("executor");
+ if (StringUtils.hasText(executorBeanName)) {
+ addPropertyReference(builder, "executor", executorBeanName);
+ }
+
return builder.getBeanDefinition();
}
diff --git
a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/schema/DubboBeanDefinitionParser.java
b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/schema/DubboBeanDefinitionParser.java
index d426e56430..832c91f0c4 100644
---
a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/schema/DubboBeanDefinitionParser.java
+++
b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/schema/DubboBeanDefinitionParser.java
@@ -81,6 +81,7 @@ public class DubboBeanDefinitionParser implements
BeanDefinitionParser {
private static final String ONRETURN = "onreturn";
private static final String ONTHROW = "onthrow";
private static final String ONINVOKE = "oninvoke";
+ private static final String EXECUTOR = "executor";
private static final String METHOD = "Method";
private static final String BEAN_NAME = "BEAN_NAME";
private static boolean resolvePlaceholdersEnabled = true;
@@ -193,6 +194,8 @@ public class DubboBeanDefinitionParser implements
BeanDefinitionParser {
String method = value.substring(index + 1);
reference = new RuntimeBeanReference(ref);
beanDefinition.getPropertyValues().addPropertyValue(property + METHOD, method);
+ } else if (EXECUTOR.equals(property)){
+ reference = new RuntimeBeanReference(value);
} else {
if ("ref".equals(property) &&
parserContext.getRegistry().containsBeanDefinition(value)) {
BeanDefinition refBean =
parserContext.getRegistry().getBeanDefinition(value);
diff --git
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
index ca3a573d17..eeb0d91f22 100644
---
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
+++
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
@@ -348,6 +348,12 @@
<xsd:documentation><![CDATA[ The serialization
protocol of service. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="executor" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation>
+ <![CDATA[ Bean name of service executor(thread
pool), used for thread pool isolation between services. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:extension>
</xsd:complexContent>
@@ -449,6 +455,11 @@
<xsd:documentation><![CDATA[ The preferred protocol to use,
set protocol name. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="executor-management-mode" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ Thread pool management:
default/isolation. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
</xsd:complexType>
<xsd:complexType name="moduleType">
diff --git
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index 5c399f5a62..860b95053d 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -360,6 +360,12 @@
<![CDATA[ Weather the service is export
asynchronously, default false. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="executor" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation>
+ <![CDATA[ Bean name of service executor(thread
pool), used for thread pool isolation between services. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:extension>
</xsd:complexContent>
@@ -456,6 +462,11 @@
<xsd:documentation><![CDATA[ The preferred protocol to use,
set protocol name. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="executor-management-mode" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ Thread pool management:
default/isolation. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
</xsd:complexType>
<xsd:complexType name="moduleType">
diff --git
a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/api/ApiIsolationTest.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/api/ApiIsolationTest.java
new file mode 100644
index 0000000000..ae7a5be361
--- /dev/null
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/api/ApiIsolationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.config.spring.isolation.api;
+
+import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.threadpool.manager.IsolationExecutorRepository;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ProtocolConfig;
+import org.apache.dubbo.config.RegistryConfig;
+import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.bootstrap.DubboBootstrap;
+import org.apache.dubbo.config.spring.api.DemoService;
+import org.apache.dubbo.config.spring.api.HelloService;
+import org.apache.dubbo.config.spring.impl.DemoServiceImpl;
+import org.apache.dubbo.config.spring.impl.HelloServiceImpl;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import
org.apache.dubbo.test.check.registrycenter.config.ZookeeperRegistryCenterConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_ISOLATION;
+
+public class ApiIsolationTest {
+ private static RegistryConfig registryConfig;
+
+ @BeforeAll
+ public static void beforeAll() {
+ FrameworkModel.destroyAll();
+ registryConfig = new
RegistryConfig(ZookeeperRegistryCenterConfig.getConnectionAddress1());
+ }
+
+ @AfterAll
+ public static void afterAll() throws Exception {
+ FrameworkModel.destroyAll();
+ }
+
+ private String version1 = "1.0";
+ private String version2 = "2.0";
+ private String version3 = "3.0";
+
+
+ @Test
+ public void test() throws Exception {
+
+
+ DubboBootstrap providerBootstrap = null;
+ DubboBootstrap consumerBootstrap1 = null;
+ DubboBootstrap consumerBootstrap2 = null;
+
+ try {
+
+ // provider app
+ providerBootstrap = DubboBootstrap.newInstance();
+
+ ServiceConfig serviceConfig1 = new ServiceConfig();
+ serviceConfig1.setInterface(DemoService.class);
+ serviceConfig1.setRef(new DemoServiceImpl());
+ serviceConfig1.setVersion(version1);
+ // set executor1 for serviceConfig1, max threads is 10
+ NamedThreadFactory threadFactory1 = new
NamedThreadFactory("DemoService-executor");
+ ExecutorService executor1 = Executors.newFixedThreadPool(10,
threadFactory1);
+ serviceConfig1.setExecutor(executor1);
+
+ ServiceConfig serviceConfig2 = new ServiceConfig();
+ serviceConfig2.setInterface(HelloService.class);
+ serviceConfig2.setRef(new HelloServiceImpl());
+ serviceConfig2.setVersion(version2);
+ // set executor2 for serviceConfig2, max threads is 100
+ NamedThreadFactory threadFactory2 = new
NamedThreadFactory("HelloService-executor");
+ ExecutorService executor2 = Executors.newFixedThreadPool(100,
threadFactory2);
+ serviceConfig2.setExecutor(executor2);
+
+ ServiceConfig serviceConfig3 = new ServiceConfig();
+ serviceConfig3.setInterface(HelloService.class);
+ serviceConfig3.setRef(new HelloServiceImpl());
+ serviceConfig3.setVersion(version3);
+ // Because executor is not set for serviceConfig3, the default
executor of serviceConfig3 is built using
+ // the threadpool parameter of the protocolConfig (
FixedThreadpool , max threads is 200)
+ serviceConfig3.setExecutor(null);
+
+ // It takes effect only if [executor-management-mode=isolation] is
configured
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_ISOLATION);
+
+ providerBootstrap
+ .application(applicationConfig)
+ .registry(registryConfig)
+ // export with tri and dubbo protocol
+ .protocol(new ProtocolConfig("tri", 20001))
+ .protocol(new ProtocolConfig("dubbo", 20002))
+ .service(serviceConfig1)
+ .service(serviceConfig2)
+ .service(serviceConfig3);
+
+ providerBootstrap.start();
+
+ // Verify that the executor is the previously configured
+ ApplicationModel applicationModel =
providerBootstrap.getApplicationModel();
+ ExecutorRepository repository =
ExecutorRepository.getInstance(applicationModel);
+ Assertions.assertTrue(repository instanceof
IsolationExecutorRepository);
+ Assertions.assertEquals(executor1,
repository.getExecutor(serviceConfig1.toUrl()));
+ Assertions.assertEquals(executor2,
repository.getExecutor(serviceConfig2.toUrl()));
+ // the default executor of serviceConfig3 is built using the
threadpool parameter of the protocol
+ ThreadPoolExecutor executor3 = (ThreadPoolExecutor)
repository.getExecutor(serviceConfig3.toUrl());
+ Assertions.assertTrue(executor3.getThreadFactory() instanceof
NamedInternalThreadFactory);
+ NamedInternalThreadFactory threadFactory3 =
(NamedInternalThreadFactory) executor3.getThreadFactory();
+
+ // consumer app start with dubbo protocol and rpc call
+ consumerBootstrap1 = configConsumerBootstrapWithProtocol("dubbo");
+ rpcInvoke(consumerBootstrap1);
+
+ // consumer app start with tri protocol and rpc call
+ consumerBootstrap2 = configConsumerBootstrapWithProtocol("tri");
+ rpcInvoke(consumerBootstrap2);
+
+ // Verify that when the provider accepts different service
requests,
+ // whether to use the respective executor(threadFactory) of
different services to create threads
+ AtomicInteger threadNum1 = threadFactory1.getThreadNum();
+ AtomicInteger threadNum2 = threadFactory2.getThreadNum();
+ AtomicInteger threadNum3 = threadFactory3.getThreadNum();
+ Assertions.assertEquals(threadNum1.get(), 11);
+ Assertions.assertEquals(threadNum2.get(), 101);
+ Assertions.assertEquals(threadNum3.get(), 201);
+
+ } finally {
+ if (providerBootstrap != null) {
+ providerBootstrap.destroy();
+ }
+ if (consumerBootstrap1 != null) {
+ consumerBootstrap1.destroy();
+ }
+ if (consumerBootstrap2 != null) {
+ consumerBootstrap2.destroy();
+ }
+ }
+ }
+
+ private void rpcInvoke(DubboBootstrap consumerBootstrap) {
+ DemoService demoServiceV1 =
consumerBootstrap.getCache().get(DemoService.class.getName() + ":" + version1);
+ HelloService helloServiceV2 =
consumerBootstrap.getCache().get(HelloService.class.getName() + ":" + version2);
+ HelloService helloServiceV3 =
consumerBootstrap.getCache().get(HelloService.class.getName() + ":" + version3);
+ for (int i = 0; i < 250; i++) {
+ demoServiceV1.sayName("name, version = " + version1);
+ }
+ for (int i = 0; i < 250; i++) {
+ helloServiceV2.sayHello("hello, version = " + version2);
+ }
+ for (int i = 0; i < 250; i++) {
+ helloServiceV3.sayHello("hello, version = " + version3);
+ }
+ }
+
+ private DubboBootstrap configConsumerBootstrapWithProtocol(String
protocol) {
+ DubboBootstrap consumerBootstrap;
+ consumerBootstrap = DubboBootstrap.newInstance();
+ consumerBootstrap.application("consumer-app")
+ .registry(registryConfig)
+ .reference(builder ->
builder.interfaceClass(DemoService.class).version(version1).protocol(protocol).injvm(false))
+ .reference(builder ->
builder.interfaceClass(HelloService.class).version(version2).protocol(protocol).injvm(false))
+ .reference(builder ->
builder.interfaceClass(HelloService.class).version(version3).protocol(protocol).injvm(false));
+ consumerBootstrap.start();
+ return consumerBootstrap;
+ }
+
+}
diff --git
a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/BaseTest.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/BaseTest.java
new file mode 100644
index 0000000000..eb6c121c8f
--- /dev/null
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/BaseTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.config.spring.isolation.spring;
+
+import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.threadpool.manager.IsolationExecutorRepository;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.spring.api.DemoService;
+import org.apache.dubbo.config.spring.api.HelloService;
+import
org.apache.dubbo.config.spring.isolation.spring.support.DemoServiceExecutor;
+import
org.apache.dubbo.config.spring.isolation.spring.support.HelloServiceExecutor;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.context.ApplicationContext;
+
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class BaseTest {
+
+ protected ServiceConfig serviceConfig1;
+ protected ServiceConfig serviceConfig2;
+ protected ServiceConfig serviceConfig3;
+
+ @Test
+ public void test() throws Exception {
+ test();
+ }
+
+ protected void assertExecutor(ApplicationContext providerContext,
ApplicationContext consumerContext) {
+
+ // find configured "executor-demo-service" executor
+ Map<String, DemoServiceExecutor> beansOfType1 =
providerContext.getBeansOfType(DemoServiceExecutor.class);
+ ThreadPoolExecutor executor1 =
beansOfType1.get("executor-demo-service");
+ NamedThreadFactory threadFactory1 = (NamedThreadFactory)
executor1.getThreadFactory();
+
+ // find configured "executor-hello-service" executor
+ Map<String, HelloServiceExecutor> beansOfType2 =
providerContext.getBeansOfType(HelloServiceExecutor.class);
+ ThreadPoolExecutor executor2 =
beansOfType2.get("executor-hello-service");
+ NamedThreadFactory threadFactory2 = (NamedThreadFactory)
executor2.getThreadFactory();
+
+ // Verify that the executor is the previously configured
+ Map<String, ApplicationModel> applicationModelMap =
providerContext.getBeansOfType(ApplicationModel.class);
+ ApplicationModel applicationModel =
applicationModelMap.get(ApplicationModel.class.getName());
+ ExecutorRepository repository =
ExecutorRepository.getInstance(applicationModel);
+ Assertions.assertTrue(repository instanceof
IsolationExecutorRepository);
+ Assertions.assertEquals(executor1,
repository.getExecutor(serviceConfig1.toUrl()));
+ Assertions.assertEquals(executor2,
repository.getExecutor(serviceConfig2.toUrl()));
+ // the default executor of serviceConfig3 is built using the
threadpool parameter of the protocol
+ ThreadPoolExecutor executor3 = (ThreadPoolExecutor)
repository.getExecutor(serviceConfig3.toUrl());
+ Assertions.assertTrue(executor3.getThreadFactory() instanceof
NamedInternalThreadFactory);
+ NamedInternalThreadFactory threadFactory3 =
(NamedInternalThreadFactory) executor3.getThreadFactory();
+
+ // rpc invoke with dubbo protocol
+ DemoService demoServiceV1 =
consumerContext.getBean("dubbo-demoServiceV1", DemoService.class);
+ HelloService helloServiceV2 =
consumerContext.getBean("dubbo-helloServiceV2", HelloService.class);
+ HelloService helloServiceV3 =
consumerContext.getBean("dubbo-helloServiceV3", HelloService.class);
+ rpcInvoke(demoServiceV1, helloServiceV2, helloServiceV3);
+
+ // rpc invoke with tri protocol
+ demoServiceV1 = consumerContext.getBean("tri-demoServiceV1",
DemoService.class);
+ helloServiceV2 = consumerContext.getBean("tri-helloServiceV2",
HelloService.class);
+ helloServiceV3 = consumerContext.getBean("tri-helloServiceV3",
HelloService.class);
+ rpcInvoke(demoServiceV1, helloServiceV2, helloServiceV3);
+
+ // Verify that when the provider accepts different service requests,
+ // whether to use the respective executor(threadFactory) of different
services to create threads
+ AtomicInteger threadNum1 = threadFactory1.getThreadNum();
+ AtomicInteger threadNum2 = threadFactory2.getThreadNum();
+ AtomicInteger threadNum3 = threadFactory3.getThreadNum();
+ Assertions.assertEquals(threadNum1.get(), 11);
+ Assertions.assertEquals(threadNum2.get(), 101);
+ Assertions.assertEquals(threadNum3.get(), 201);
+ }
+
+ private void rpcInvoke(DemoService demoServiceV1, HelloService
helloServiceV2, HelloService helloServiceV3) {
+ for (int i = 0; i < 250; i++) {
+ demoServiceV1.sayName("name");
+ }
+ for (int i = 0; i < 250; i++) {
+ helloServiceV2.sayHello("hello");
+ }
+ for (int i = 0; i < 250; i++) {
+ helloServiceV3.sayHello("hello");
+ }
+ }
+
+}
diff --git
a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/AnnotationIsolationTest.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/AnnotationIsolationTest.java
new file mode 100644
index 0000000000..3a994e937a
--- /dev/null
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/AnnotationIsolationTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.config.spring.isolation.spring.annotation;
+
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ProtocolConfig;
+import org.apache.dubbo.config.RegistryConfig;
+import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
+import org.apache.dubbo.config.spring.isolation.spring.BaseTest;
+import
org.apache.dubbo.config.spring.isolation.spring.support.DemoServiceExecutor;
+import
org.apache.dubbo.config.spring.isolation.spring.support.HelloServiceExecutor;
+import org.junit.jupiter.api.Test;
+import
org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_ISOLATION;
+
+public class AnnotationIsolationTest extends BaseTest {
+
+ @Test
+ public void test() throws Exception {
+ // start provider app
+ AnnotationConfigApplicationContext providerContext = new
AnnotationConfigApplicationContext(ProviderConfiguration.class);
+ providerContext.start();
+
+ // start consumer app
+ AnnotationConfigApplicationContext consumerContext = new
AnnotationConfigApplicationContext(ConsumerConfiguration.class);
+ consumerContext.start();
+
+ // getAndSet serviceConfig
+ setServiceConfig(providerContext);
+
+ // assert isolation of executor
+ assertExecutor(providerContext, consumerContext);
+
+ // close context
+ providerContext.close();
+ consumerContext.close();
+
+ }
+
+ private void setServiceConfig(AnnotationConfigApplicationContext
providerContext) {
+ Map<String, ServiceConfig> serviceConfigMap =
providerContext.getBeansOfType(ServiceConfig.class);
+ serviceConfig1 =
serviceConfigMap.get("ServiceBean:org.apache.dubbo.config.spring.api.DemoService:1.0.0:Group1");
+ serviceConfig2 =
serviceConfigMap.get("ServiceBean:org.apache.dubbo.config.spring.api.HelloService:2.0.0:Group2");
+ serviceConfig3 =
serviceConfigMap.get("ServiceBean:org.apache.dubbo.config.spring.api.HelloService:3.0.0:Group3");
+ }
+
+ // note scanBasePackages, refer three service with dubbo and tri protocol
+ @Configuration
+ @EnableDubbo(scanBasePackages = "org.apache.dubbo.demo.consumer.comp")
+ @ComponentScan(value =
{"org.apache.dubbo.config.spring.isolation.spring.annotation.consumer"})
+ static class ConsumerConfiguration {
+ @Bean
+ public RegistryConfig registryConfig() {
+ RegistryConfig registryConfig = new RegistryConfig();
+ registryConfig.setAddress("zookeeper://127.0.0.1:2181");
+ return registryConfig;
+ }
+
+ @Bean
+ public ApplicationConfig applicationConfig() {
+ ApplicationConfig applicationConfig = new
ApplicationConfig("consumer-app");
+ return applicationConfig;
+ }
+ }
+
+ // note scanBasePackages, expose three service with dubbo and tri protocol
+ @Configuration
+ @EnableDubbo(scanBasePackages =
"org.apache.dubbo.config.spring.isolation.spring.annotation.provider")
+ static class ProviderConfiguration {
+ @Bean
+ public RegistryConfig registryConfig() {
+ RegistryConfig registryConfig = new RegistryConfig();
+ registryConfig.setAddress("zookeeper://127.0.0.1:2181");
+ return registryConfig;
+ }
+
+ // NOTE: we need config executor-management-mode="isolation"
+ @Bean
+ public ApplicationConfig applicationConfig() {
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_ISOLATION);
+ return applicationConfig;
+ }
+
+ // expose services with dubbo protocol
+ @Bean
+ public ProtocolConfig dubbo() {
+ ProtocolConfig protocolConfig = new ProtocolConfig("dubbo");
+ return protocolConfig;
+ }
+
+ // expose services with tri protocol
+ @Bean
+ public ProtocolConfig tri() {
+ ProtocolConfig protocolConfig = new ProtocolConfig("tri");
+ return protocolConfig;
+ }
+
+ // customized thread pool
+ @Bean("executor-demo-service")
+ public Executor demoServiceExecutor() {
+ return new DemoServiceExecutor();
+ }
+
+ // customized thread pool
+ @Bean("executor-hello-service")
+ public Executor helloServiceExecutor() {
+ return new HelloServiceExecutor();
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/dubbo/DemoServiceV1.java
similarity index 54%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/dubbo/DemoServiceV1.java
index b9496c0c1a..9c40bb1953 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/dubbo/DemoServiceV1.java
@@ -14,24 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package
org.apache.dubbo.config.spring.isolation.spring.annotation.consumer.dubbo;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.apache.dubbo.config.spring.api.Box;
+import org.apache.dubbo.config.spring.api.DemoService;
+import org.springframework.stereotype.Component;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+@Component("dubbo-demoServiceV1")
+public class DemoServiceV1 implements DemoService {
+ @DubboReference(version = "1.0.0", group = "Group1", scope = "remote",
protocol = "dubbo")
+ private DemoService demoService;
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
+ @Override
+ public String sayName(String name) {
+ return demoService.sayName(name);
+ }
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ @Override
+ public Box getBox() {
+ return null;
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/dubbo/HelloServiceV2.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/dubbo/HelloServiceV2.java
index b9496c0c1a..a70b7884e6 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/dubbo/HelloServiceV2.java
@@ -14,24 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package
org.apache.dubbo.config.spring.isolation.spring.annotation.consumer.dubbo;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.apache.dubbo.config.spring.api.HelloService;
+import org.springframework.stereotype.Component;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+@Component("dubbo-helloServiceV2")
+public class HelloServiceV2 implements HelloService {
+ @DubboReference(version = "2.0.0", group = "Group2", scope = "remote",
protocol = "dubbo")
+ private HelloService helloService;
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ @Override
+ public String sayHello(String name) {
+ return helloService.sayHello(name);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/dubbo/HelloServiceV3.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/dubbo/HelloServiceV3.java
index b9496c0c1a..e807ffeab2 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/dubbo/HelloServiceV3.java
@@ -14,24 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package
org.apache.dubbo.config.spring.isolation.spring.annotation.consumer.dubbo;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.apache.dubbo.config.spring.api.HelloService;
+import org.springframework.stereotype.Component;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+@Component("dubbo-helloServiceV3")
+public class HelloServiceV3 implements HelloService {
+ @DubboReference(version = "3.0.0", group = "Group3", scope = "remote",
protocol = "dubbo")
+ private HelloService helloService;
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ @Override
+ public String sayHello(String name) {
+ return helloService.sayHello(name);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/tri/DemoServiceV1.java
similarity index 54%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/tri/DemoServiceV1.java
index b9496c0c1a..cd8131f717 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/tri/DemoServiceV1.java
@@ -14,24 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package
org.apache.dubbo.config.spring.isolation.spring.annotation.consumer.tri;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.apache.dubbo.config.spring.api.Box;
+import org.apache.dubbo.config.spring.api.DemoService;
+import org.springframework.stereotype.Component;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+@Component("tri-demoServiceV1")
+public class DemoServiceV1 implements DemoService {
+ @DubboReference(version = "1.0.0",group = "Group1", scope = "remote",
protocol = "tri")
+ private DemoService demoService;
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
+ @Override
+ public String sayName(String name) {
+ return demoService.sayName(name);
+ }
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ @Override
+ public Box getBox() {
+ return null;
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/tri/HelloServiceV2.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/tri/HelloServiceV2.java
index b9496c0c1a..6c3c4c2dad 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/tri/HelloServiceV2.java
@@ -14,24 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package
org.apache.dubbo.config.spring.isolation.spring.annotation.consumer.tri;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.apache.dubbo.config.spring.api.HelloService;
+import org.springframework.stereotype.Component;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+@Component("tri-helloServiceV2")
+public class HelloServiceV2 implements HelloService {
+ @DubboReference(version = "2.0.0", group = "Group2", scope = "remote",
protocol = "tri")
+ private HelloService helloService;
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ @Override
+ public String sayHello(String name) {
+ return helloService.sayHello(name);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/tri/HelloServiceV3.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/tri/HelloServiceV3.java
index b9496c0c1a..61812f868c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/consumer/tri/HelloServiceV3.java
@@ -14,24 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package
org.apache.dubbo.config.spring.isolation.spring.annotation.consumer.tri;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.config.annotation.DubboReference;
+import org.apache.dubbo.config.spring.api.HelloService;
+import org.springframework.stereotype.Component;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+@Component("tri-helloServiceV3")
+public class HelloServiceV3 implements HelloService {
+ @DubboReference(version = "3.0.0", group = "Group3", scope = "remote",
protocol = "tri")
+ private HelloService helloService;
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ @Override
+ public String sayHello(String name) {
+ return helloService.sayHello(name);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/provider/DemoServiceImplV1.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/provider/DemoServiceImplV1.java
index b9496c0c1a..ca0614e9a4 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/provider/DemoServiceImplV1.java
@@ -14,24 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dubbo.config.spring.isolation.spring.annotation.provider;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.config.annotation.DubboService;
+import org.apache.dubbo.config.spring.api.Box;
+import org.apache.dubbo.config.spring.api.DemoService;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+@DubboService(executor = "executor-demo-service", version = "1.0.0", group =
"Group1")
+public class DemoServiceImplV1 implements DemoService {
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
+ @Override
+ public String sayName(String name) {
+ return "server name";
+ }
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ @Override
+ public Box getBox() {
+ return null;
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/provider/HelloServiceImplV2.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/provider/HelloServiceImplV2.java
index b9496c0c1a..1d23403c77 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/provider/HelloServiceImplV2.java
@@ -14,24 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dubbo.config.spring.isolation.spring.annotation.provider;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.config.annotation.DubboService;
+import org.apache.dubbo.config.spring.api.HelloService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+@DubboService(version = "3.0.0", group = "Group3")
+public class HelloServiceImplV2 implements HelloService {
+ private static final Logger logger =
LoggerFactory.getLogger(HelloServiceImplV2.class);
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ @Override
+ public String sayHello(String name) {
+ return "server hello";
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/provider/HelloServiceImplV3.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/provider/HelloServiceImplV3.java
index b9496c0c1a..a99bf24754 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/annotation/provider/HelloServiceImplV3.java
@@ -14,24 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dubbo.config.spring.isolation.spring.annotation.provider;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.config.annotation.DubboService;
+import org.apache.dubbo.config.spring.api.HelloService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+@DubboService(executor = "executor-hello-service", version = "2.0.0", group =
"Group2")
+public class HelloServiceImplV3 implements HelloService {
+ private static final Logger logger =
LoggerFactory.getLogger(HelloServiceImplV3.class);
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+ @Override
+ public String sayHello(String name) {
+ return "server hello";
}
+
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/support/DemoServiceExecutor.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/support/DemoServiceExecutor.java
index b9496c0c1a..099b51db02 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/support/DemoServiceExecutor.java
@@ -14,24 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dubbo.config.spring.isolation.spring.support;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+public class DemoServiceExecutor extends ThreadPoolExecutor {
+ public DemoServiceExecutor() {
+ super(10, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(),
+ new NamedThreadFactory("DemoServiceExecutor"));
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/support/HelloServiceExecutor.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/support/HelloServiceExecutor.java
index b9496c0c1a..1ba6eca5c4 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/support/HelloServiceExecutor.java
@@ -14,24 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dubbo.config.spring.isolation.spring.support;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+public class HelloServiceExecutor extends ThreadPoolExecutor {
+ public HelloServiceExecutor() {
+ super(100, 100, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(),
+ new NamedThreadFactory("HelloServiceExecutor"));
}
+
}
diff --git
a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/xml/XmlIsolationTest.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/xml/XmlIsolationTest.java
new file mode 100644
index 0000000000..61d5ed3fb3
--- /dev/null
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/isolation/spring/xml/XmlIsolationTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.config.spring.isolation.spring.xml;
+
+import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.spring.isolation.spring.BaseTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import java.util.Map;
+
+public class XmlIsolationTest extends BaseTest {
+
+ @Test
+ public void test() throws Exception {
+ // start provider app
+ ClassPathXmlApplicationContext providerContext = new
ClassPathXmlApplicationContext("META-INF/isolation/dubbo-provider.xml");
+ providerContext.start();
+
+ // start consumer app
+ ClassPathXmlApplicationContext consumerContext = new
ClassPathXmlApplicationContext("META-INF/isolation/dubbo-consumer.xml");
+ consumerContext.start();
+
+ // getAndSet serviceConfig
+ setServiceConfig(providerContext);
+
+ // assert isolation of executor
+ assertExecutor(providerContext, consumerContext);
+
+ // close context
+ providerContext.close();
+ consumerContext.close();
+ }
+
+ private void setServiceConfig(ClassPathXmlApplicationContext
providerContext) {
+ Map<String, ServiceConfig> serviceConfigMap =
providerContext.getBeansOfType(ServiceConfig.class);
+ serviceConfig1 =
serviceConfigMap.get("org.apache.dubbo.config.spring.ServiceBean#0");
+ serviceConfig2 =
serviceConfigMap.get("org.apache.dubbo.config.spring.ServiceBean#1");
+ serviceConfig3 =
serviceConfigMap.get("org.apache.dubbo.config.spring.ServiceBean#2");
+ }
+
+}
diff --git
a/dubbo-config/dubbo-config-spring/src/test/resources/META-INF/isolation/dubbo-consumer.xml
b/dubbo-config/dubbo-config-spring/src/test/resources/META-INF/isolation/dubbo-consumer.xml
new file mode 100644
index 0000000000..e442c3eb66
--- /dev/null
+++
b/dubbo-config/dubbo-config-spring/src/test/resources/META-INF/isolation/dubbo-consumer.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
+ xmlns="http://www.springframework.org/schema/beans"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
+ http://dubbo.apache.org/schema/dubbo
http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
+
+ <dubbo:application name="demo-consumer">
+ </dubbo:application>
+
+ <dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
+
+ <dubbo:registry id="demo1"
address="zookeeper://127.0.0.1:2181?registry-type=service"/>
+
+ <!-- refer with dubbo protocol-->
+ <dubbo:reference version="1.0.0" group="Group1" id="dubbo-demoServiceV1"
check="false" scope="remote"
+
interface="org.apache.dubbo.config.spring.api.DemoService" protocol="dubbo"/>
+
+ <dubbo:reference version="2.0.0" group="Group2" id="dubbo-helloServiceV2"
check="false" scope="remote"
+
interface="org.apache.dubbo.config.spring.api.HelloService" protocol="dubbo"/>
+
+ <dubbo:reference version="3.0.0" group="Group3" id="dubbo-helloServiceV3"
check="false" scope="remote"
+
interface="org.apache.dubbo.config.spring.api.HelloService" protocol="dubbo"/>
+
+ <!-- refer with tri protocol-->
+ <dubbo:reference version="1.0.0" group="Group1" id="tri-demoServiceV1"
check="false" scope="remote"
+
interface="org.apache.dubbo.config.spring.api.DemoService" protocol="tri"/>
+
+ <dubbo:reference version="2.0.0" group="Group2" id="tri-helloServiceV2"
check="false" scope="remote"
+
interface="org.apache.dubbo.config.spring.api.HelloService" protocol="tri"/>
+
+ <dubbo:reference version="3.0.0" group="Group3" id="tri-helloServiceV3"
check="false" scope="remote"
+
interface="org.apache.dubbo.config.spring.api.HelloService" protocol="tri"/>
+
+</beans>
diff --git
a/dubbo-config/dubbo-config-spring/src/test/resources/META-INF/isolation/dubbo-provider.xml
b/dubbo-config/dubbo-config-spring/src/test/resources/META-INF/isolation/dubbo-provider.xml
new file mode 100644
index 0000000000..cff46ead4a
--- /dev/null
+++
b/dubbo-config/dubbo-config-spring/src/test/resources/META-INF/isolation/dubbo-provider.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
+ xmlns="http://www.springframework.org/schema/beans"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
+ http://dubbo.apache.org/schema/dubbo
http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
+
+ <!-- NOTE: we need config executor-management-mode="isolation" -->
+ <dubbo:application name="demo-provider"
executor-management-mode="isolation">
+ </dubbo:application>
+
+ <dubbo:config-center address="zookeeper://127.0.0.1:2181"/>
+ <dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
+ <dubbo:registry id="registry1"
address="zookeeper://127.0.0.1:2181?registry-type=service"/>
+
+ <dubbo:protocol name="dubbo" port="-1"/>
+ <dubbo:protocol name="tri" port="-1"/>
+
+ <!-- expose three service with dubbo and tri protocol-->
+ <bean id="demoServiceV1"
class="org.apache.dubbo.config.spring.impl.DemoServiceImpl"/>
+ <bean id="helloServiceV2"
class="org.apache.dubbo.config.spring.impl.HelloServiceImpl"/>
+ <bean id="helloServiceV3"
class="org.apache.dubbo.config.spring.impl.HelloServiceImpl"/>
+
+ <!-- customized thread pool -->
+ <bean id="executor-demo-service"
+
class="org.apache.dubbo.config.spring.isolation.spring.support.DemoServiceExecutor"/>
+ <bean id="executor-hello-service"
+
class="org.apache.dubbo.config.spring.isolation.spring.support.HelloServiceExecutor"/>
+
+ <!-- this service use [executor="executor-demo-service"] as isolated
thread pool-->
+ <dubbo:service executor="executor-demo-service"
+ interface="org.apache.dubbo.config.spring.api.DemoService"
version="1.0.0" group="Group1"
+ timeout="3000" ref="demoServiceV1" registry="registry1"
protocol="dubbo,tri"/>
+
+ <!-- this service use [executor="executor-hello-service"] as isolated
thread pool-->
+ <dubbo:service executor="executor-hello-service"
+ interface="org.apache.dubbo.config.spring.api.HelloService"
version="2.0.0" group="Group2"
+ timeout="5000" ref="helloServiceV2" registry="registry1"
protocol="dubbo,tri"/>
+
+ <!-- not set executor for this service, the default executor built using
threadpool parameter of the protocolConfig -->
+ <dubbo:service interface="org.apache.dubbo.config.spring.api.HelloService"
version="3.0.0" group="Group3"
+ timeout="5000" ref="helloServiceV3" registry="registry1"
protocol="dubbo,tri"/>
+
+</beans>
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index 40185dcb68..f265a71246 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -103,8 +103,7 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
}
private void initExecutor(URL url) {
- ExecutorRepository executorRepository =
url.getOrDefaultApplicationModel()
-
.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ ExecutorRepository executorRepository =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
/**
* Consumer's executor is shared globally, provider ip doesn't need to
be part of the thread name.
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index bd6a91b72d..9ed96920a0 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -55,7 +55,7 @@ public abstract class AbstractServer extends AbstractEndpoint
implements Remotin
public AbstractServer(URL url, ChannelHandler handler) throws
RemotingException {
super(url, handler);
- executorRepository =
url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ executorRepository =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY,
getUrl().getHost());
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
index a82a634688..00d41742f5 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
@@ -29,7 +29,9 @@ import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.apache.dubbo.remoting.transport.ChannelHandlerDelegate;
import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.executor.ExecutorSupport;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
public class WrappedChannelHandler implements ChannelHandlerDelegate {
@@ -40,9 +42,12 @@ public class WrappedChannelHandler implements
ChannelHandlerDelegate {
protected final URL url;
+ protected final ExecutorSupport executorSupport;
+
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
+ this.executorSupport =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutorSupport(url);
}
public void close() {
@@ -121,15 +126,24 @@ public class WrappedChannelHandler implements
ChannelHandlerDelegate {
} else {
ExecutorService executor = responseFuture.getExecutor();
if (executor == null || executor.isShutdown()) {
- executor = getSharedExecutorService();
+ executor = getSharedExecutorService(msg);
}
return executor;
}
} else {
- return getSharedExecutorService();
+ return getSharedExecutorService(msg);
}
}
+ /**
+ * @param msg msg is the network message body,
executorSupport.getExecutor needs it, and gets important information from it to
get executor
+ * @return
+ */
+ public ExecutorService getSharedExecutorService(Object msg) {
+ Executor executor = executorSupport.getExecutor(msg);
+ return executor != null ? (ExecutorService) executor :
getSharedExecutorService();
+ }
+
/**
* get the shared executor for current Server or Client
*
@@ -145,8 +159,7 @@ public class WrappedChannelHandler implements
ChannelHandlerDelegate {
// note: url.getOrDefaultApplicationModel() may create new application
model
ApplicationModel applicationModel = url.getOrDefaultApplicationModel();
- ExecutorRepository executorRepository =
-
applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ ExecutorRepository executorRepository =
ExecutorRepository.getInstance(applicationModel);
ExecutorService executor = executorRepository.getExecutor(url);
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index eacc24f09e..f22edbf927 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -302,7 +302,7 @@ public abstract class AbstractInvoker<T> implements
Invoker<T> {
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
return new ThreadlessExecutor();
}
- return
url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
+ return
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutor(url);
}
/**
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 913138dc66..43d4db8ec3 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.serialize.ObjectInput;
import org.apache.dubbo.common.serialize.ObjectOutput;
import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.exchange.Request;
@@ -41,6 +42,7 @@ import java.io.IOException;
import java.io.InputStream;
import static
org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_ISOLATION;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
@@ -144,7 +146,7 @@ public class DubboCodec extends ExchangeCodec {
}
} else {
DecodeableRpcInvocation inv;
- if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY,
DEFAULT_DECODE_IN_IO_THREAD)) {
+ if (isDecodeDataInIoThread(channel)) {
inv = new DecodeableRpcInvocation(frameworkModel,
channel, req, is, proto);
inv.decode();
} else {
@@ -167,6 +169,21 @@ public class DubboCodec extends ExchangeCodec {
}
}
+ private boolean isDecodeDataInIoThread(Channel channel) {
+ boolean decodeDataInIoThread =
channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY,
DEFAULT_DECODE_IN_IO_THREAD);
+ String mode =
ExecutorRepository.getMode(channel.getUrl().getOrDefaultApplicationModel());
+ if (EXECUTOR_MANAGEMENT_MODE_ISOLATION.equals(mode)) {
+ if (!decodeDataInIoThread) {
+ log.info("Because thread pool isolation is enabled on the
dubbo protocol, the body can only be decoded " +
+ "on the io thread, and the parameter[" +
DECODE_IN_IO_THREAD_KEY + "] will be ignored");
+ // Why? because obtaining the isolated thread pool requires
the serviceKey of the service,
+ // and this part must be decoded before it can be obtained
(more see DubboExecutorSupport)
+ }
+ return true;
+ }
+ return decodeDataInIoThread;
+ }
+
private byte[] readMessageData(InputStream is) throws IOException {
if (is.available() > 0) {
byte[] result = new byte[is.available()];
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupport.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupport.java
new file mode 100644
index 0000000000..861d460701
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupport.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo;
+
+import org.apache.dubbo.common.ServiceKey;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.executor.AbstractIsolationExecutorSupport;
+
+import java.util.Map;
+
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+public class DubboIsolationExecutorSupport extends
AbstractIsolationExecutorSupport {
+ private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(DubboIsolationExecutorSupport.class);
+
+ public DubboIsolationExecutorSupport(URL url) {
+ super(url);
+ }
+
+ @Override
+ protected ServiceKey getServiceKey(Object data) {
+ if (!(data instanceof Request)) {
+ return null;
+ }
+
+ try {
+ Request request = (Request) data;
+ if (request.getData() == null || !(request.getData() instanceof
Invocation)) {
+ return null;
+ }
+ Invocation inv = (Invocation) request.getData();
+ Map<String, String> attachments = inv.getAttachments();
+ String interfaceName = attachments.get(PATH_KEY);
+ String version = attachments.get(VERSION_KEY);
+ String group = attachments.get(GROUP_KEY);
+ return new ServiceKey(interfaceName, version, group);
+ } catch (Throwable e) {
+ logger.error("failed to get service key, maybe the build rule for
data is wrong, data = " + data, e);
+ }
+
+ return null;
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupportFactory.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupportFactory.java
index b9496c0c1a..febc5de632 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupportFactory.java
@@ -14,24 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dubbo.rpc.protocol.dubbo;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.executor.ExecutorSupport;
+import org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+public class DubboIsolationExecutorSupportFactory implements
IsolationExecutorSupportFactory {
+ @Override
+ public ExecutorSupport createIsolationExecutorSupport(URL url) {
+ return new DubboIsolationExecutorSupport(url);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory
new file mode 100644
index 0000000000..86ea9fed5e
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory
@@ -0,0 +1 @@
+dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboIsolationExecutorSupportFactory
diff --git
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
index 71587e6dd5..9f9d314747 100644
---
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
@@ -74,7 +74,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
super(type, url);
this.key = key;
this.exporter = exporter;
- this.executorRepository =
url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ this.executorRepository =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
this.paramDeepCopyUtil =
url.getOrDefaultFrameworkModel().getExtensionLoader(ParamDeepCopyUtil.class)
.getExtension(url.getParameter(CommonConstants.INJVM_COPY_UTIL_KEY,
DefaultParamDeepCopyUtil.NAME));
this.shouldIgnoreSameModule =
url.getParameter(CommonConstants.INJVM_IGNORE_SAME_MODULE_KEY, false);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 198dcf8abb..170ad30614 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -28,6 +28,7 @@ import org.apache.dubbo.remoting.api.AbstractWireProtocol;
import org.apache.dubbo.remoting.api.pu.ChannelHandlerPretender;
import org.apache.dubbo.remoting.api.pu.ChannelOperator;
import org.apache.dubbo.rpc.HeaderFilter;
+import org.apache.dubbo.rpc.executor.ExecutorSupport;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
@@ -127,6 +128,7 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
DEFAULT_MAX_HEADER_LIST_SIZE)))
.frameLogger(SERVER_LOGGER)
.build();
+ ExecutorSupport executorSupport =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutorSupport(url);
TripleWriteQueue writeQueue = new TripleWriteQueue();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
new ChannelInitializer<Http2StreamChannel>() {
@@ -134,7 +136,7 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
protected void initChannel(Http2StreamChannel ch) {
final ChannelPipeline p = ch.pipeline();
p.addLast(new TripleCommandOutBoundHandler());
- p.addLast(new
TripleHttp2FrameServerHandler(frameworkModel, lookupExecutor(url),
+ p.addLast(new
TripleHttp2FrameServerHandler(frameworkModel, executorSupport,
headFilters, ch, writeQueue));
}
});
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 5244e9de32..66f347682c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -120,9 +120,7 @@ public class TripleProtocol extends AbstractProtocol {
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceInterface(),
HealthCheckResponse.ServingStatus.SERVING);
// init
-
url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class)
- .getDefaultExtension()
- .createExecutorIfAbsent(url);
+
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).createExecutorIfAbsent(url);
PortUnificationExchanger.bind(url, new DefaultPuHandler());
optimizeSerialization(url);
@@ -141,9 +139,7 @@ public class TripleProtocol extends AbstractProtocol {
}
private ExecutorService getOrCreateStreamExecutor(ApplicationModel
applicationModel, URL url) {
- ExecutorService executor =
applicationModel.getExtensionLoader(ExecutorRepository.class)
- .getDefaultExtension()
- .createExecutorIfAbsent(url);
+ ExecutorService executor =
ExecutorRepository.getInstance(applicationModel).createExecutorIfAbsent(url);
Objects.requireNonNull(executor,
String.format("No available executor found in %s", url));
return executor;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
index b9496c0c1a..22dbba7358 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
@@ -27,11 +27,15 @@ import java.util.concurrent.Executor;
*/
public abstract class AbstractStream implements Stream {
- protected final Executor executor;
+ protected Executor executor;
protected final FrameworkModel frameworkModel;
public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
this.executor = new SerializingExecutor(executor);
this.frameworkModel = frameworkModel;
}
+
+ public void setExecutor(Executor executor) {
+ this.executor = new SerializingExecutor(executor);
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
index 4e6c992828..08595f090b 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.HeaderFilter;
import org.apache.dubbo.rpc.PathResolver;
import org.apache.dubbo.rpc.TriRpcStatus;
+import org.apache.dubbo.rpc.executor.ExecutorSupport;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
import org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream;
@@ -44,26 +45,23 @@ public class TripleHttp2FrameServerHandler extends
ChannelDuplexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(
TripleHttp2FrameServerHandler.class);
private final PathResolver pathResolver;
- private final FrameworkModel frameworkModel;
- private final Executor executor;
- private final List<HeaderFilter> filters;
+ private final ExecutorSupport executorSupport;
private final String acceptEncoding;
private final TripleServerStream tripleServerStream;
public TripleHttp2FrameServerHandler(
FrameworkModel frameworkModel,
- Executor executor,
+ ExecutorSupport executorSupport,
List<HeaderFilter> filters,
Http2StreamChannel channel,
TripleWriteQueue writeQueue) {
- this.frameworkModel = frameworkModel;
- this.executor = executor;
- this.filters = filters;
+ this.executorSupport = executorSupport;
this.acceptEncoding = String.join(",",
frameworkModel.getExtensionLoader(DeCompressor.class).getSupportedExtensions());
this.pathResolver =
frameworkModel.getExtensionLoader(PathResolver.class)
.getDefaultExtension();
- tripleServerStream = new TripleServerStream(channel, frameworkModel,
executor, pathResolver, acceptEncoding, filters, writeQueue);
+ // The executor will be assigned in onHeadersRead method
+ tripleServerStream = new TripleServerStream(channel, frameworkModel,
null, pathResolver, acceptEncoding, filters, writeQueue);
}
@Override
@@ -109,6 +107,8 @@ public class TripleHttp2FrameServerHandler extends
ChannelDuplexHandler {
}
public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame
msg) throws Exception {
+ Executor executor = executorSupport.getExecutor(msg.headers());
+ tripleServerStream.setExecutor(executor);
tripleServerStream.transportObserver.onHeader(msg.headers(),
msg.isEndStream());
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleIsolationExecutorSupport.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleIsolationExecutorSupport.java
new file mode 100644
index 0000000000..0068a876d5
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleIsolationExecutorSupport.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.transport;
+
+import io.netty.handler.codec.http2.Http2Headers;
+import org.apache.dubbo.common.ServiceKey;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
+import org.apache.dubbo.rpc.executor.AbstractIsolationExecutorSupport;
+
+public class TripleIsolationExecutorSupport extends
AbstractIsolationExecutorSupport {
+ private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(TripleIsolationExecutorSupport.class);
+
+ public TripleIsolationExecutorSupport(URL url) {
+ super(url);
+ }
+
+ @Override
+ protected ServiceKey getServiceKey(Object data) {
+ if (!(data instanceof Http2Headers)) {
+ return null;
+ }
+
+ try {
+ Http2Headers headers = (Http2Headers) data;
+ String path = headers.path().toString();
+ String[] parts = path.split("/"); // path like
/{interfaceName}/{methodName}
+ String interfaceName = parts[1];
+ String version =
headers.contains(TripleHeaderEnum.SERVICE_VERSION.getHeader()) ?
+
headers.get(TripleHeaderEnum.SERVICE_VERSION.getHeader()).toString() : null;
+ String group =
headers.contains(TripleHeaderEnum.SERVICE_GROUP.getHeader()) ?
+
headers.get(TripleHeaderEnum.SERVICE_GROUP.getHeader()).toString() : null;
+ return new ServiceKey(interfaceName, version, group);
+ } catch (Throwable e) {
+ logger.error("failed to get service key, maybe the build rule for
data is wrong, data = " + data, e);
+ }
+
+ return null;
+ }
+
+
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleIsolationExecutorSupportFactory.java
similarity index 58%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
copy to
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleIsolationExecutorSupportFactory.java
index b9496c0c1a..d882396c85 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleIsolationExecutorSupportFactory.java
@@ -14,24 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dubbo.rpc.protocol.tri.transport;
-package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.executor.ExecutorSupport;
+import org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory;
-import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-
-import java.util.concurrent.Executor;
-
-/**
- * An abstract stream implementation.
- */
-public abstract class AbstractStream implements Stream {
-
- protected final Executor executor;
- protected final FrameworkModel frameworkModel;
-
- public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
- this.executor = new SerializingExecutor(executor);
- this.frameworkModel = frameworkModel;
+public class TripleIsolationExecutorSupportFactory implements
IsolationExecutorSupportFactory {
+ @Override
+ public ExecutorSupport createIsolationExecutorSupport(URL url) {
+ return new TripleIsolationExecutorSupport(url);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory
b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory
new file mode 100644
index 0000000000..b24b714ca7
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory
@@ -0,0 +1 @@
+tri=org.apache.dubbo.rpc.protocol.tri.transport.TripleIsolationExecutorSupportFactory
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
index e6fc8adb2c..29a43f4a30 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
@@ -51,9 +51,7 @@ class TripleInvokerTest {
when(connection.getChannel())
.thenReturn(channel);
URL url = URL.valueOf("tri://127.0.0.1:9103/" +
IGreeter.class.getName());
- ExecutorService executorService = url.getOrDefaultApplicationModel()
- .getExtensionLoader(ExecutorRepository.class)
- .getDefaultExtension()
+ ExecutorService executorService =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel())
.createExecutorIfAbsent(url);
TripleClientCall call = Mockito.mock(TripleClientCall.class);
StreamObserver streamObserver = Mockito.mock(StreamObserver.class);