This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new bc85a3f29a Performance Optimization (#11975)
bc85a3f29a is described below
commit bc85a3f29a01e44fb415b0647ac755c319f3fcac
Author: Albumen Kevin <[email protected]>
AuthorDate: Tue Apr 11 16:10:06 2023 +0800
Performance Optimization (#11975)
---
.../manager/DefaultExecutorRepository.java | 70 ++++++++++++-
.../threadpool/manager/ExecutorRepository.java | 11 +-
.../manager/IsolationExecutorRepository.java | 10 ++
.../org/apache/dubbo/config/ApplicationConfig.java | 20 ++--
.../executor/AbstractIsolationExecutorSupport.java | 10 +-
.../rpc/model/FrameworkServiceRepository.java | 13 ---
.../common/extension/ExtensionLoaderTest.java | 6 +-
.../rpc/model/FrameworkServiceRepositoryTest.java | 8 --
.../org/apache/dubbo/config/ServiceConfig.java | 1 +
.../apache/dubbo/config/ApplicationConfigTest.java | 5 +-
.../dubbo/remoting/transport/CodecSupport.java | 35 -------
.../transport/dispatcher/ChannelHandlers.java | 2 +-
.../org/apache/dubbo/remoting/utils/UrlUtils.java | 3 +
.../support/header/HeartbeatHandlerTest.java | 20 +++-
.../transport/netty/ClientReconnectTest.java | 15 ++-
.../remoting/transport/netty/NettyClientTest.java | 38 +++++--
.../transport/netty/NettyClientToServerTest.java | 14 +++
.../remoting/transport/netty/NettyStringTest.java | 18 +++-
.../remoting/transport/netty/ThreadNameTest.java | 17 +--
.../transport/netty4/ClientReconnectTest.java | 32 +++++-
.../remoting/transport/netty4/ConnectionTest.java | 19 +++-
.../transport/netty4/NettyClientToServerTest.java | 33 +++++-
.../transport/netty4/NettyTransporterTest.java | 29 +++++-
.../netty4/PortUnificationExchangerTest.java | 19 +++-
.../netty4/PortUnificationServerTest.java | 19 +++-
.../transport/netty4/ReplierDispatcherTest.java | 20 +++-
.../MultiplexProtocolConnectionManagerTest.java | 20 ++++
.../api/SingleProtocolConnectionManagerTest.java | 19 +++-
...tializer.java => RpcScopeModelInitializer.java} | 7 +-
.../rpc/protocol/PermittedSerializationKeeper.java | 68 ++++++++++++
.../rpc/protocol/ProtocolSerializationWrapper.java | 2 +-
...rg.apache.dubbo.rpc.model.ScopeModelInitializer | 2 +-
.../protocol/dubbo/DecodeableRpcInvocation.java | 18 +++-
.../dubbo/rpc/protocol/dubbo/DubboCodec.java | 36 ++++---
.../dubbo/DubboIsolationExecutorSupport.java | 45 +++++---
.../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 7 +-
.../dubbo/DecodeableRpcInvocationTest.java | 4 +-
.../dubbo/decode/DubboTelnetDecodeTest.java | 11 +-
.../dubbo/managemode}/ChannelHandlersTest.java | 4 +-
.../managemode}/ConnectChannelHandlerTest.java | 4 +-
.../protocol/dubbo/managemode/MockedChannel.java | 115 +++++++++++++++++++++
.../dubbo/managemode/MockedChannelHandler.java | 61 +++++++++++
.../managemode}/WrappedChannelHandlerTest.java | 6 +-
43 files changed, 746 insertions(+), 170 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index e35617e44c..c792ca07c3 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -30,11 +30,14 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.ConsumerConfig;
import org.apache.dubbo.config.ModuleConfig;
import org.apache.dubbo.config.ProviderConfig;
+import org.apache.dubbo.rpc.executor.DefaultExecutorSupport;
+import org.apache.dubbo.rpc.executor.ExecutorSupport;
import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.ModuleModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
-import org.apache.dubbo.rpc.executor.DefaultExecutorSupport;
-import org.apache.dubbo.rpc.executor.ExecutorSupport;
+import org.apache.dubbo.rpc.model.ServiceModel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -64,6 +67,8 @@ import static
org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_UNEXP
public class DefaultExecutorRepository implements ExecutorRepository,
ExtensionAccessorAware {
private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(DefaultExecutorRepository.class);
+ private static final String MAX_KEY = String.valueOf(Integer.MAX_VALUE);
+
private volatile ScheduledExecutorService serviceExportExecutor;
private volatile ExecutorService serviceReferExecutor;
@@ -131,16 +136,34 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
}
}
+ private String getExecutorSecondKey(ServiceModel serviceModel, URL url) {
+ if (serviceModel instanceof ConsumerModel) {
+ return getConsumerKey(serviceModel);
+ } else {
+ return getProviderKey((ProviderModel) serviceModel, url);
+ }
+ }
+
private String getConsumerKey(URL url) {
// Consumer's executor is sharing globally, key=Integer.MAX_VALUE
return String.valueOf(Integer.MAX_VALUE);
}
+ private String getConsumerKey(ServiceModel serviceModel) {
+ // Consumer's executor is sharing globally, key=Integer.MAX_VALUE
+ return MAX_KEY;
+ }
+
protected String getProviderKey(URL url) {
// Provider's executor is sharing by protocol.
return String.valueOf(url.getPort());
}
+ protected String getProviderKey(ProviderModel providerModel, URL url) {
+ // Provider's executor is sharing by protocol.
+ return String.valueOf(url.getPort());
+ }
+
/**
* Return the executor key based on the type (internal or biz) of the
current service.
*
@@ -162,6 +185,18 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
return executorKey;
}
+ private String getExecutorKey(ServiceModel serviceModel) {
+ if (serviceModel.getModuleModel().isInternal()) {
+ return INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
+ }
+
+ if (serviceModel instanceof ProviderModel) {
+ return EXECUTOR_SERVICE_COMPONENT_KEY;
+ } else {
+ return CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
+ }
+ }
+
protected ExecutorService createExecutor(URL url) {
return (ExecutorService)
extensionAccessor.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
@@ -197,6 +232,37 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
}
}
+ @Override
+ public ExecutorService getExecutor(ServiceModel serviceModel, URL url) {
+ Map<String, ExecutorService> executors =
data.get(getExecutorKey(serviceModel));
+
+ /*
+ * It's guaranteed that this method is called after {@link
#createExecutorIfAbsent(URL)}, so data should already
+ * have Executor instances generated and stored.
+ */
+ if (executors == null) {
+ logger.warn(COMMON_EXECUTORS_NO_FOUND, "", "", "No available
executors, this is not expected, framework should call createExecutorIfAbsent
first" +
+ "before coming to here.");
+
+ return null;
+ }
+
+ // Consumer's executor is sharing globally, key=Integer.MAX_VALUE.
Provider's executor is sharing by protocol.
+ String executorCacheKey = getExecutorSecondKey(serviceModel, url);
+ ExecutorService executor = executors.get(executorCacheKey);
+ if (executor != null && (executor.isShutdown() ||
executor.isTerminated())) {
+ executors.remove(executorCacheKey);
+ // Does not re-create a shutdown executor, use SHARED_EXECUTOR for
downgrade.
+ executor = null;
+ logger.info("Executor for " + url + " is shutdown.");
+ }
+ if (executor == null) {
+ return frameworkExecutorRepository.getSharedExecutor();
+ } else {
+ return executor;
+ }
+ }
+
@Override
public void updateThreadpool(URL url, ExecutorService executor) {
try {
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
index e9e13b77c7..67ab0cf2ab 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -22,19 +22,20 @@ import org.apache.dubbo.common.extension.ExtensionScope;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.ApplicationConfig;
-import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.executor.ExecutorSupport;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ServiceModel;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
-import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_ISOLATION;
/**
*
*/
-@SPI(value = "default", scope = ExtensionScope.APPLICATION)
+@SPI(value = "isolation", scope = ExtensionScope.APPLICATION)
public interface ExecutorRepository {
/**
@@ -54,6 +55,8 @@ public interface ExecutorRepository {
*/
ExecutorService getExecutor(URL url);
+ ExecutorService getExecutor(ServiceModel serviceModel, URL url);
+
/**
@@ -187,7 +190,7 @@ public interface ExecutorRepository {
static String getMode(ApplicationModel applicationModel) {
Optional<ApplicationConfig> optional =
applicationModel.getApplicationConfigManager().getApplication();
- return
optional.map(ApplicationConfig::getExecutorManagementMode).orElse(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+ return
optional.map(ApplicationConfig::getExecutorManagementMode).orElse(EXECUTOR_MANAGEMENT_MODE_ISOLATION);
}
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/IsolationExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/IsolationExecutorRepository.java
index d507992071..0b6d66fe3e 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/IsolationExecutorRepository.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/IsolationExecutorRepository.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.executor.ExecutorSupport;
import org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory;
import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
import java.util.concurrent.ExecutorService;
@@ -52,6 +53,15 @@ public class IsolationExecutorRepository extends
DefaultExecutorRepository {
}
}
+ @Override
+ protected String getProviderKey(ProviderModel providerModel, URL url) {
+ if (url.getAttributes().containsKey(SERVICE_EXECUTOR)) {
+ return providerModel.getServiceKey();
+ } else {
+ return super.getProviderKey(url);
+ }
+ }
+
@Override
protected ExecutorService createExecutor(URL url) {
Object executor = url.getAttributes().get(SERVICE_EXECUTOR);
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
b/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
index 78198f2462..63dcfb5375 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
@@ -16,14 +16,6 @@
*/
package org.apache.dubbo.config;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.dubbo.common.compiler.support.AdaptiveCompiler;
import org.apache.dubbo.common.infra.InfraAdapter;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
@@ -33,6 +25,14 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.support.Parameter;
import org.apache.dubbo.rpc.model.ApplicationModel;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import static
org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.APPLICATION_PROTOCOL_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.APPLICATION_VERSION_KEY;
@@ -40,7 +40,7 @@ import static
org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.DUMP_DIRECTORY;
import static org.apache.dubbo.common.constants.CommonConstants.DUMP_ENABLE;
import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE;
-import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_ISOLATION;
import static org.apache.dubbo.common.constants.CommonConstants.HOST_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.LIVENESS_PROBE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.METADATA_KEY;
@@ -274,7 +274,7 @@ public class ApplicationConfig extends AbstractConfig {
}
}
if (executorManagementMode == null) {
- executorManagementMode = EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+ executorManagementMode = EXECUTOR_MANAGEMENT_MODE_ISOLATION;
}
if (enableFileCache == null) {
enableFileCache = Boolean.TRUE;
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
index a8a71572d0..2a228d3cc5 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/executor/AbstractIsolationExecutorSupport.java
@@ -50,15 +50,17 @@ public abstract class AbstractIsolationExecutorSupport
implements ExecutorSuppor
for (URL serviceUrl : serviceUrls) {
if (serviceUrl.getProtocol().equals(url.getProtocol()) &&
serviceUrl.getPort() == url.getPort()) {
- return executorRepository.getExecutor(serviceUrl);
+ return executorRepository.getExecutor(providerModel,
serviceUrl);
}
}
- return executorRepository.getExecutor(serviceUrls.get(0));
+ return executorRepository.getExecutor(providerModel,
serviceUrls.get(0));
}
- protected abstract ServiceKey getServiceKey(Object data);
+ protected ServiceKey getServiceKey(Object data) {
+ return null;
+ }
- private ProviderModel getProviderModel(Object data) {
+ protected ProviderModel getProviderModel(Object data) {
ServiceKey serviceKey = getServiceKey(data);
if (serviceKey == null) {
return null;
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/FrameworkServiceRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/FrameworkServiceRepository.java
index 5d8df222e8..7aff29de0c 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/FrameworkServiceRepository.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/FrameworkServiceRepository.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.rpc.model;
-import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.StringUtils;
@@ -45,9 +44,6 @@ public class FrameworkServiceRepository {
// useful to find a provider model quickly with
serviceInterfaceName:version
private final ConcurrentMap<String, List<ProviderModel>>
providersWithoutGroup = new ConcurrentHashMap<>();
- // useful to find a url quickly with serviceInterfaceName:version
- private final ConcurrentMap<String, List<URL>> providerUrlsWithoutGroup =
new ConcurrentHashMap<>();
-
public FrameworkServiceRepository(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
}
@@ -67,7 +63,6 @@ public class FrameworkServiceRepository {
providers.remove(providerModel.getServiceKey());
String keyWithoutGroup =
keyWithoutGroup(providerModel.getServiceKey());
providersWithoutGroup.remove(keyWithoutGroup);
- providerUrlsWithoutGroup.remove(keyWithoutGroup);
}
public ProviderModel lookupExportedServiceWithoutGroup(String key) {
@@ -83,18 +78,10 @@ public class FrameworkServiceRepository {
return providersWithoutGroup.get(key);
}
- public void registerProviderUrl(URL url) {
- ConcurrentHashMapUtils.computeIfAbsent(providerUrlsWithoutGroup,
keyWithoutGroup(url.getServiceKey()), (k) -> new
CopyOnWriteArrayList<>()).add(url);
- }
-
public ProviderModel lookupExportedService(String serviceKey) {
return providers.get(serviceKey);
}
- public List<URL> lookupRegisteredProviderUrlsWithoutGroup(String key) {
- return providerUrlsWithoutGroup.get(key);
- }
-
public List<ProviderModel> allProviderModels() {
return Collections.unmodifiableList(new
ArrayList<>(providers.values()));
}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/extension/ExtensionLoaderTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/extension/ExtensionLoaderTest.java
index 5b7c7d938f..4f8f1474f1 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/common/extension/ExtensionLoaderTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/extension/ExtensionLoaderTest.java
@@ -720,7 +720,7 @@ class ExtensionLoaderTest {
void testDuplicatedImplWithoutOverriddenStrategy() {
List<LoadingStrategy> loadingStrategies =
ExtensionLoader.getLoadingStrategies();
ExtensionLoader.setLoadingStrategies(new
DubboExternalLoadingStrategyTest(false),
- new DubboInternalLoadingStrategyTest(false));
+ new DubboInternalLoadingStrategyTest(false));
ExtensionLoader<DuplicatedWithoutOverriddenExt> extensionLoader =
getExtensionLoader(DuplicatedWithoutOverriddenExt.class);
try {
extensionLoader.getExtension("duplicated");
@@ -738,7 +738,7 @@ class ExtensionLoaderTest {
void testDuplicatedImplWithOverriddenStrategy() {
List<LoadingStrategy> loadingStrategies =
ExtensionLoader.getLoadingStrategies();
ExtensionLoader.setLoadingStrategies(new
DubboExternalLoadingStrategyTest(true),
- new DubboInternalLoadingStrategyTest(true));
+ new DubboInternalLoadingStrategyTest(true));
ExtensionLoader<DuplicatedOverriddenExt> extensionLoader =
getExtensionLoader(DuplicatedOverriddenExt.class);
DuplicatedOverriddenExt duplicatedOverriddenExt =
extensionLoader.getExtension("duplicated");
assertEquals("DuplicatedOverriddenExt1",
duplicatedOverriddenExt.echo());
@@ -832,4 +832,4 @@ class ExtensionLoaderTest {
return MAX_PRIORITY;
}
}
-}
\ No newline at end of file
+}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/FrameworkServiceRepositoryTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/FrameworkServiceRepositoryTest.java
index 96909e6529..408fbc13c5 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/FrameworkServiceRepositoryTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/FrameworkServiceRepositoryTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.rpc.model;
-import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.support.DemoService;
@@ -82,12 +81,6 @@ class FrameworkServiceRepositoryTest {
Assertions.assertEquals(providerModels.size(), 1);
Assertions.assertEquals(providerModels.get(0), providerModel);
- URL url = URL.valueOf("test://127.0.0.1:9103/" +
DemoService.class.getName() + "?group=GROUP&version=1.0.0");
- frameworkServiceRepository.registerProviderUrl(url);
- List<URL> urls =
frameworkServiceRepository.lookupRegisteredProviderUrlsWithoutGroup(keyWithoutGroup(url.getServiceKey()));
- Assertions.assertEquals(urls.size(), 1);
- Assertions.assertEquals(urls.get(0), url);
-
ConsumerModel consumerModel = new ConsumerModel(
serviceMetadata.getServiceKey(), new DemoServiceImpl(),
serviceDescriptor,
moduleModel, serviceMetadata, null,
ClassUtils.getClassLoader(DemoService.class));
@@ -99,7 +92,6 @@ class FrameworkServiceRepositoryTest {
frameworkServiceRepository.unregisterProvider(providerModel);
Assertions.assertNull(frameworkServiceRepository.lookupExportedService(serviceKey));
Assertions.assertNull(frameworkServiceRepository.lookupExportedServiceWithoutGroup(keyWithoutGroup));
-
Assertions.assertNull(frameworkServiceRepository.lookupRegisteredProviderUrlsWithoutGroup(keyWithoutGroup));
}
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 7fdc7e7424..61784867c3 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -471,6 +471,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
* Because executor is not a string type, it cannot be attached to
the url parameter, so it is added to URL#attributes
* and obtained it in IsolationExecutorRepository#createExecutor
method
*/
+ providerModel.getServiceMetadata().addAttribute(SERVICE_EXECUTOR,
getExecutor());
url.getAttributes().put(SERVICE_EXECUTOR, getExecutor());
}
}
diff --git
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ApplicationConfigTest.java
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ApplicationConfigTest.java
index ee7be3f03c..a6ec4dac6e 100644
---
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ApplicationConfigTest.java
+++
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ApplicationConfigTest.java
@@ -18,7 +18,6 @@
package org.apache.dubbo.config;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -31,7 +30,7 @@ import java.util.Map;
import static
org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.DUMP_DIRECTORY;
-import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_ISOLATION;
import static org.apache.dubbo.common.constants.QosConstants.ACCEPT_FOREIGN_IP;
import static org.apache.dubbo.common.constants.QosConstants.QOS_ENABLE;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -346,7 +345,7 @@ class ApplicationConfigTest {
ApplicationConfig applicationConfig =
DubboBootstrap.getInstance().getApplication();
Assertions.assertEquals(DUBBO, applicationConfig.getProtocol());
- Assertions.assertEquals(EXECUTOR_MANAGEMENT_MODE_DEFAULT,
applicationConfig.getExecutorManagementMode());
+ Assertions.assertEquals(EXECUTOR_MANAGEMENT_MODE_ISOLATION,
applicationConfig.getExecutorManagementMode());
Assertions.assertEquals(Boolean.TRUE,
applicationConfig.getEnableFileCache());
DubboBootstrap.getInstance().destroy();
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
index 416cd55441..d109e97cb1 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
@@ -24,11 +24,9 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.serialize.ObjectInput;
import org.apache.dubbo.common.serialize.ObjectOutput;
import org.apache.dubbo.common.serialize.Serialization;
-import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -36,13 +34,11 @@ import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import static org.apache.dubbo.common.BaseServiceMetadata.keyWithoutGroup;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_SERIALIZATION;
public class CodecSupport {
@@ -164,37 +160,6 @@ public class CodecSupport {
return Arrays.equals(payload,
getNullBytesOf(getSerializationById(proto)));
}
- public static void checkSerialization(FrameworkServiceRepository
serviceRepository, String path, String version, Byte id) throws IOException {
- List<URL> urls =
serviceRepository.lookupRegisteredProviderUrlsWithoutGroup(keyWithoutGroup(path,
version));
- if (CollectionUtils.isEmpty(urls)) {
- throw new IOException("Service " + path + " with version " +
version + " not found, invocation rejected.");
- } else {
- boolean match = urls.stream().anyMatch(url -> isMatch(url, id));
- if (!match) {
- throw new IOException("Unexpected serialization id:" + id + "
received from network, please check if the peer send the right id.");
- }
- }
-
- }
-
- /**
- * Is Match
- *
- * @param url url
- * @param id id
- * @return boolean
- */
- private static boolean isMatch(URL url, Byte id) {
- Byte localId;
- for (String serialization : UrlUtils.allSerializations(url)) {
- localId = SERIALIZATIONNAME_ID_MAP.get(serialization);
- if (id.equals(localId)) {
- return true;
- }
- }
- return false;
- }
-
public static void checkSerialization(String requestSerializeName, URL
url) throws IOException {
Collection<String> all = UrlUtils.allSerializations(url);
checkSerialization(requestSerializeName, all);
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelHandlers.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelHandlers.java
index 42fdcaf3dd..f11163515a 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelHandlers.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelHandlers.java
@@ -33,7 +33,7 @@ public class ChannelHandlers {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
- protected static ChannelHandlers getInstance() {
+ public static ChannelHandlers getInstance() {
return INSTANCE;
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/UrlUtils.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/UrlUtils.java
index 983b5b1927..56b8bac3a9 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/UrlUtils.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/UrlUtils.java
@@ -34,6 +34,8 @@ import static
org.apache.dubbo.remoting.Constants.PREFER_SERIALIZATION_KEY;
import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
public class UrlUtils {
+ private static final String ALLOWED_SERIALIZATION_KEY =
"allowedSerialization";
+
public static int getIdleTimeout(URL url) {
int heartBeat = getHeartbeat(url);
// idleTimeout should be at least more than twice heartBeat because
possible retries of client.
@@ -91,6 +93,7 @@ public class UrlUtils {
* @param url url
* @return {@link List}<{@link String}>
*/
+ @SuppressWarnings("unchecked")
public static Collection<String> allSerializations(URL url) {
// preferSerialization -> serialization -> default serialization
Set<String> serializations = new
LinkedHashSet<>(preferSerialization(url));
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java
index d457f5e33a..608a530731 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
@@ -30,7 +31,7 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.transport.dispatcher.FakeChannelHandlers;
-
+import org.apache.dubbo.rpc.model.ApplicationModel;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -38,6 +39,8 @@ import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
class HeartbeatHandlerTest {
private static final Logger logger =
LoggerFactory.getLogger(HeartbeatHandlerTest.class);
@@ -72,6 +75,11 @@ class HeartbeatHandlerTest {
.addParameter(Constants.HEARTBEAT_KEY, 1000);
CountDownLatch connect = new CountDownLatch(1);
CountDownLatch disconnect = new CountDownLatch(1);
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ serverURL = serverURL.setScopeModel(applicationModel);
TestHeartbeatHandler handler = new TestHeartbeatHandler(connect,
disconnect);
server = Exchangers.bind(serverURL, handler);
System.out.println("Server bind successfully");
@@ -97,6 +105,11 @@ class HeartbeatHandlerTest {
.addParameter(Constants.TRANSPORTER_KEY, "netty3")
.addParameter(Constants.HEARTBEAT_KEY, 1000)
.addParameter(Constants.CODEC_KEY, "telnet");
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ serverURL = serverURL.setScopeModel(applicationModel);
CountDownLatch connect = new CountDownLatch(1);
CountDownLatch disconnect = new CountDownLatch(1);
TestHeartbeatHandler handler = new TestHeartbeatHandler(connect,
disconnect);
@@ -118,6 +131,11 @@ class HeartbeatHandlerTest {
.addParameter(Constants.EXCHANGER_KEY, HeaderExchanger.NAME)
.addParameter(Constants.TRANSPORTER_KEY, "netty3")
.addParameter(Constants.CODEC_KEY, "telnet");
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ serverURL = serverURL.setScopeModel(applicationModel);
CountDownLatch connect = new CountDownLatch(1);
CountDownLatch disconnect = new CountDownLatch(1);
TestHeartbeatHandler handler = new TestHeartbeatHandler(connect,
disconnect);
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java
index a90828dbad..f4229aa56d 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.dubbo.remoting.transport.netty;
+import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.DubboAppender;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Client;
import org.apache.dubbo.remoting.Constants;
@@ -25,12 +27,14 @@ import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
-
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
/**
* Client reconnect test
*/
@@ -71,8 +75,13 @@ class ClientReconnectTest {
public Client startClient(int port, int heartbeat) throws
RemotingException {
- final String url = "exchange://127.0.0.1:" + port +
"/client.reconnect.test?check=false&codec=exchange&client=netty3&" +
- Constants.HEARTBEAT_KEY + "=" + heartbeat;
+ URL url = URL.valueOf("exchange://127.0.0.1:" + port +
"/client.reconnect.test?check=false&codec=exchange&client=netty3&" +
+ Constants.HEARTBEAT_KEY + "=" + heartbeat);
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ url = url.setScopeModel(applicationModel);
return Exchangers.connect(url);
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientTest.java
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientTest.java
index 675a1474a0..4b14d0f4b1 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientTest.java
@@ -18,11 +18,11 @@ package org.apache.dubbo.remoting.transport.netty;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.Exchangers;
-
+import org.apache.dubbo.rpc.model.ApplicationModel;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -30,6 +30,8 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
/**
* Date: 5/3/11
* Time: 5:47 PM
@@ -40,7 +42,13 @@ class NettyClientTest {
@BeforeAll
public static void setUp() throws Exception {
- server = Exchangers.bind(URL.valueOf("exchange://localhost:" + port +
"?server=netty3&codec=exchange"), new TelnetServerHandler());
+ URL url = URL.valueOf("exchange://localhost:" + port +
"?server=netty3&codec=exchange");
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ url = url.setScopeModel(applicationModel);
+ server = Exchangers.bind(url, new TelnetServerHandler());
}
@AfterAll
@@ -52,16 +60,22 @@ class NettyClientTest {
}
}
- public static void main(String[] args) throws RemotingException,
InterruptedException {
- ExchangeChannel client =
Exchangers.connect(URL.valueOf("exchange://10.20.153.10:20880?client=netty3&heartbeat=1000&codec=exchange"));
- Thread.sleep(60 * 1000 * 50);
- }
+// public static void main(String[] args) throws RemotingException,
InterruptedException {
+// ExchangeChannel client =
Exchangers.connect(URL.valueOf("exchange://10.20.153.10:20880?client=netty3&heartbeat=1000&codec=exchange"));
+// Thread.sleep(60 * 1000 * 50);
+// }
@Test
void testClientClose() throws Exception {
List<ExchangeChannel> clients = new ArrayList<ExchangeChannel>(100);
for (int i = 0; i < 100; i++) {
- ExchangeChannel client =
Exchangers.connect(URL.valueOf("exchange://localhost:" + port +
"?client=netty3&codec=exchange"));
+ URL url = URL.valueOf("exchange://localhost:" + port +
"?client=netty3&codec=exchange");
+ ApplicationModel applicationModel =
ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ url = url.setScopeModel(applicationModel);
+ ExchangeChannel client = Exchangers.connect(url);
Thread.sleep(5);
clients.add(client);
}
@@ -74,7 +88,13 @@ class NettyClientTest {
@Test
void testServerClose() throws Exception {
for (int i = 0; i < 100; i++) {
- RemotingServer aServer =
Exchangers.bind(URL.valueOf("exchange://localhost:" +
NetUtils.getAvailablePort(6000) + "?server=netty3&codec=exchange"), new
TelnetServerHandler());
+ URL url = URL.valueOf("exchange://localhost:" +
NetUtils.getAvailablePort(6000) + "?server=netty3&codec=exchange");
+ ApplicationModel applicationModel =
ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ url = url.setScopeModel(applicationModel);
+ RemotingServer aServer = Exchangers.bind(url, new
TelnetServerHandler());
aServer.close();
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientToServerTest.java
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientToServerTest.java
index 4a871e0fb7..57131fcbc6 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientToServerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientToServerTest.java
@@ -17,12 +17,16 @@
package org.apache.dubbo.remoting.transport.netty;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.support.Replier;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
/**
* NettyClientToServerTest
@@ -33,6 +37,11 @@ class NettyClientToServerTest extends ClientToServerTest {
// add heartbeat cycle to avoid unstable ut.
URL url = URL.valueOf("exchange://localhost:" + port +
"?server=netty3&codec=exchange");
url = url.addParameter(Constants.HEARTBEAT_KEY, 600 * 1000);
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ url = url.setScopeModel(applicationModel);
return Exchangers.bind(url, receiver);
}
@@ -40,6 +49,11 @@ class NettyClientToServerTest extends ClientToServerTest {
// add heartbeat cycle to avoid unstable ut.
URL url = URL.valueOf("exchange://localhost:" + port +
"?client=netty3&timeout=3000&codec=exchange");
url = url.addParameter(Constants.HEARTBEAT_KEY, 600 * 1000);
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ url = url.setScopeModel(applicationModel);
return Exchangers.connect(url);
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyStringTest.java
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyStringTest.java
index d1221f8ec0..8b1fde4034 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyStringTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyStringTest.java
@@ -18,14 +18,17 @@ package org.apache.dubbo.remoting.transport.netty;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
-
+import org.apache.dubbo.rpc.model.ApplicationModel;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
/**
* Date: 4/26/11
* Time: 4:13 PM
@@ -40,8 +43,17 @@ class NettyStringTest {
//int port = 10001;
int port = NetUtils.getAvailablePort();
System.out.println(port);
- server = Exchangers.bind(URL.valueOf("telnet://0.0.0.0:" + port +
"?server=netty3&codec=telnet"), new TelnetServerHandler());
- client = Exchangers.connect(URL.valueOf("telnet://127.0.0.1:" + port +
"?client=netty3&codec=telnet"), new TelnetClientHandler());
+ URL serverURL = URL.valueOf("telnet://0.0.0.0:" + port +
"?server=netty3&codec=telnet");
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ serverURL = serverURL.setScopeModel(applicationModel);
+
+ URL clientURL = URL.valueOf("telnet://127.0.0.1:" + port +
"?client=netty3&codec=telnet");
+ clientURL = clientURL.setScopeModel(applicationModel);
+ server = Exchangers.bind(serverURL, new TelnetServerHandler());
+ client = Exchangers.connect(clientURL, new TelnetClientHandler());
}
@AfterAll
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java
index d990649654..203cc35353 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java
@@ -18,11 +18,11 @@ package org.apache.dubbo.remoting.transport.netty;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.rpc.model.ApplicationModel;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -32,6 +32,8 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
class ThreadNameTest {
private NettyServer server;
@@ -53,12 +55,15 @@ class ThreadNameTest {
public void before() throws Exception {
int port = NetUtils.getAvailablePort(20880 + new
Random().nextInt(10000));
serverURL =
URL.valueOf("telnet://localhost?side=provider&codec=telnet")
- .setPort(port)
- .setScopeModel(ApplicationModel.defaultModel());
+ .setPort(port);
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ serverURL = serverURL.setScopeModel(applicationModel);
clientURL =
URL.valueOf("telnet://localhost?side=consumer&codec=telnet")
- .setPort(port)
- .setScopeModel(ApplicationModel.defaultModel());
-
+ .setPort(port);
+ clientURL = clientURL.setScopeModel(applicationModel);
serverHandler = new ThreadNameVerifyHandler(serverRegex, false,
serverLatch);
clientHandler = new ThreadNameVerifyHandler(clientRegex, true,
clientLatch);
server = new NettyServer(serverURL, serverHandler);
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java
index 03918da077..369108ea6b 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java
@@ -16,8 +16,12 @@
*/
package org.apache.dubbo.remoting.transport.netty4;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.DubboAppender;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Client;
import org.apache.dubbo.remoting.Constants;
@@ -25,12 +29,15 @@ import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
-
+import org.apache.dubbo.rpc.model.ModuleModel;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
/**
* Client reconnect test
*/
@@ -74,12 +81,31 @@ class ClientReconnectTest {
public Client startClient(int port, int heartbeat) throws
RemotingException {
- final String url = "exchange://127.0.0.1:" + port +
"/client.reconnect.test?client=netty4&check=false&" + Constants.HEARTBEAT_KEY +
"=" + heartbeat;
+ URL url = URL.valueOf("exchange://127.0.0.1:" + port +
"/client.reconnect.test?client=netty4&check=false&" + Constants.HEARTBEAT_KEY +
"=" + heartbeat);
+ FrameworkModel frameworkModel = new FrameworkModel();
+ ApplicationModel applicationModel = frameworkModel.newApplication();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, applicationModel);
return Exchangers.connect(url);
}
public RemotingServer startServer(int port) throws RemotingException {
- final String url = "exchange://127.0.0.1:" + port +
"/client.reconnect.test?server=netty4";
+ URL url = URL.valueOf("exchange://127.0.0.1:" + port +
"/client.reconnect.test?server=netty4");
+ FrameworkModel frameworkModel = new FrameworkModel();
+ ApplicationModel applicationModel = frameworkModel.newApplication();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
return Exchangers.bind(url, new HandlerAdapter());
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
index 0c5d8c4845..2c230d6c61 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
@@ -17,13 +17,17 @@
package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.api.connection.ConnectionManager;
import
org.apache.dubbo.remoting.api.connection.MultiplexProtocolConnectionManager;
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
-
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -34,6 +38,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
public class ConnectionTest {
@@ -47,6 +53,17 @@ public class ConnectionTest {
public static void init() throws RemotingException {
int port = NetUtils.getAvailablePort();
url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.setScopeModel(applicationModel);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
server = new NettyPortUnificationServer(url, new DefaultPuHandler());
server.bind();
connectionManager =
url.getOrDefaultFrameworkModel().getExtensionLoader(ConnectionManager.class).getExtension(MultiplexProtocolConnectionManager.NAME);
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientToServerTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientToServerTest.java
index 79fe9b2571..2f0f6e03fb 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientToServerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientToServerTest.java
@@ -17,12 +17,19 @@
package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.support.Replier;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
+
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
/**
* Netty4ClientToServerTest
@@ -32,7 +39,20 @@ class NettyClientToServerTest extends ClientToServerTest {
protected ExchangeServer newServer(int port, Replier<?> receiver) throws
RemotingException {
// add heartbeat cycle to avoid unstable ut.
URL url = URL.valueOf("exchange://localhost:" + port +
"?server=netty4");
- url = url.addParameter(Constants.HEARTBEAT_KEY, 600 * 1000);
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.addParameter(Constants.HEARTBEAT_KEY, 600 *
1000).putAttribute(CommonConstants.SCOPE_MODEL, applicationModel);
+ url = url.setScopeModel(applicationModel);
+// ModuleModel moduleModel = applicationModel.getDefaultModule();
+
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
return Exchangers.bind(url, receiver);
}
@@ -40,6 +60,17 @@ class NettyClientToServerTest extends ClientToServerTest {
// add heartbeat cycle to avoid unstable ut.
URL url = URL.valueOf("exchange://localhost:" + port +
"?client=netty4&timeout=3000");
url = url.addParameter(Constants.HEARTBEAT_KEY, 600 * 1000);
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.setScopeModel(applicationModel);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
return Exchangers.connect(url);
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyTransporterTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyTransporterTest.java
index 17b0db79f2..772f3d604a 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyTransporterTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyTransporterTest.java
@@ -17,18 +17,24 @@
package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -39,6 +45,17 @@ class NettyTransporterTest {
URL url = new ServiceConfigURL("telnet", "localhost", port,
new String[]{Constants.BIND_PORT_KEY, String.valueOf(port)});
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.setScopeModel(applicationModel);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
RemotingServer server = new NettyTransporter().bind(url, new
ChannelHandlerAdapter());
assertThat(server.isBound(), is(true));
@@ -51,7 +68,17 @@ class NettyTransporterTest {
int port = NetUtils.getAvailablePort();
URL url = new ServiceConfigURL("telnet", "localhost", port,
new String[]{Constants.BIND_PORT_KEY, String.valueOf(port)});
-
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.setScopeModel(applicationModel);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
new NettyTransporter().bind(url, new ChannelHandlerAdapter() {
@Override
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationExchangerTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationExchangerTest.java
index f06c018f3e..faa9de8869 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationExchangerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationExchangerTest.java
@@ -17,16 +17,22 @@
package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
-
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
class PortUnificationExchangerTest {
private static URL url;
@@ -35,6 +41,17 @@ class PortUnificationExchangerTest {
public static void init() throws RemotingException {
int port = NetUtils.getAvailablePort();
url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.setScopeModel(applicationModel);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
}
@Test
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationServerTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationServerTest.java
index 61273baf4c..0f5268da89 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationServerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/PortUnificationServerTest.java
@@ -17,20 +17,37 @@
package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
class PortUnificationServerTest {
@Test
void testBind() throws RemotingException {
int port = NetUtils.getAvailablePort();
URL url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
-
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.setScopeModel(applicationModel);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
// abstract endpoint need to get codec of url(which is in triple
package)
final NettyPortUnificationServer server = new
NettyPortUnificationServer(url, new DefaultPuHandler());
server.bind();
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ReplierDispatcherTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ReplierDispatcherTest.java
index 5d76aa0099..bc3f5d8cbb 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ReplierDispatcherTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ReplierDispatcherTest.java
@@ -19,12 +19,15 @@ package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.support.ReplierDispatcher;
-
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -38,6 +41,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
import static org.junit.jupiter.api.Assertions.fail;
@@ -59,7 +63,19 @@ class ReplierDispatcherTest {
ReplierDispatcher dispatcher = new ReplierDispatcher();
dispatcher.addReplier(RpcMessage.class, new RpcMessageHandler());
dispatcher.addReplier(Data.class, (channel, msg) -> new
StringMessage("hello world"));
- exchangeServer = Exchangers.bind(URL.valueOf("exchange://localhost:" +
port + "?" + CommonConstants.TIMEOUT_KEY + "=60000"), dispatcher);
+ URL url = URL.valueOf("exchange://localhost:" + port + "?" +
CommonConstants.TIMEOUT_KEY + "=60000");
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.setScopeModel(applicationModel);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
+ exchangeServer = Exchangers.bind(url, dispatcher);
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/MultiplexProtocolConnectionManagerTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/MultiplexProtocolConnectionManagerTest.java
index 634c4a72c1..2bceec7ab6 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/MultiplexProtocolConnectionManagerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/MultiplexProtocolConnectionManagerTest.java
@@ -18,6 +18,9 @@
package org.apache.dubbo.remoting.transport.netty4.api;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.api.connection.ConnectionManager;
@@ -25,6 +28,8 @@ import
org.apache.dubbo.remoting.api.connection.MultiplexProtocolConnectionManag
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServer;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -34,6 +39,8 @@ import java.lang.reflect.Field;
import java.util.Map;
import java.util.function.Consumer;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
public class MultiplexProtocolConnectionManagerTest {
private static URL url1;
@@ -46,8 +53,21 @@ public class MultiplexProtocolConnectionManagerTest {
@BeforeAll
public static void init() throws RemotingException {
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
url1 = URL.valueOf("empty://127.0.0.1:8080?foo=bar");
url2 = URL.valueOf("tri://127.0.0.1:8081?foo=bar");
+ url1 = url1.setScopeModel(applicationModel);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url1 = url1.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
+ url2 = url2.setScopeModel(applicationModel);
+ url2 = url2.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
server = new NettyPortUnificationServer(url1, new DefaultPuHandler());
server.bind();
connectionManager = url1.getOrDefaultFrameworkModel()
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/SingleProtocolConnectionManagerTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/SingleProtocolConnectionManagerTest.java
index fab70fd76e..bc4d548008 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/SingleProtocolConnectionManagerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/SingleProtocolConnectionManagerTest.java
@@ -18,7 +18,10 @@
package org.apache.dubbo.remoting.transport.netty4.api;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.api.connection.ConnectionManager;
@@ -26,7 +29,8 @@ import
org.apache.dubbo.remoting.api.connection.SingleProtocolConnectionManager;
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.apache.dubbo.remoting.transport.netty4.NettyConnectionClient;
import org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServer;
-
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -36,6 +40,8 @@ import java.lang.reflect.Field;
import java.util.Map;
import java.util.function.Consumer;
+import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+
public class SingleProtocolConnectionManagerTest {
private static URL url;
@@ -48,6 +54,17 @@ public class SingleProtocolConnectionManagerTest {
public static void init() throws RemotingException {
int port = NetUtils.getAvailablePort();
url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ ApplicationConfig applicationConfig = new
ApplicationConfig("provider-app");
+
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
+
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
+ ConfigManager configManager = new ConfigManager(applicationModel);
+ configManager.setApplication(applicationConfig);
+ configManager.getApplication();
+ applicationModel.setConfigManager(configManager);
+ url = url.setScopeModel(applicationModel);
+ ModuleModel moduleModel = applicationModel.getDefaultModule();
+ url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
server = new NettyPortUnificationServer(url, new DefaultPuHandler());
server.bind();
connectionManager = url.getOrDefaultFrameworkModel()
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/listener/ExporterScopeModelInitializer.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcScopeModelInitializer.java
similarity index 83%
rename from
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/listener/ExporterScopeModelInitializer.java
rename to
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcScopeModelInitializer.java
index 87092be949..d246431565 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/listener/ExporterScopeModelInitializer.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcScopeModelInitializer.java
@@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.rpc.listener;
+package org.apache.dubbo.rpc;
import org.apache.dubbo.common.beans.factory.ScopeBeanFactory;
+import org.apache.dubbo.rpc.listener.InjvmExporterListener;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ModuleModel;
import org.apache.dubbo.rpc.model.ScopeModelInitializer;
+import org.apache.dubbo.rpc.protocol.PermittedSerializationKeeper;
-public class ExporterScopeModelInitializer implements ScopeModelInitializer {
+public class RpcScopeModelInitializer implements ScopeModelInitializer {
@Override
public void initializeFrameworkModel(FrameworkModel frameworkModel) {
ScopeBeanFactory beanFactory = frameworkModel.getBeanFactory();
beanFactory.registerBean(InjvmExporterListener.class);
+ beanFactory.registerBean(PermittedSerializationKeeper.class);
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/PermittedSerializationKeeper.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/PermittedSerializationKeeper.java
new file mode 100644
index 0000000000..7bc8d12017
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/PermittedSerializationKeeper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.remoting.utils.UrlUtils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static
org.apache.dubbo.common.BaseServiceMetadata.interfaceFromServiceKey;
+import static
org.apache.dubbo.common.BaseServiceMetadata.versionFromServiceKey;
+
+public class PermittedSerializationKeeper {
+ private final ConcurrentMap<String, Set<Byte>> serviceToSerializationId =
new ConcurrentHashMap<>();
+ private final Set<Byte> globalPermittedSerializationIds = new
ConcurrentHashSet<>();
+
+ public void registerService(URL url) {
+ Set<Byte> set =
ConcurrentHashMapUtils.computeIfAbsent(serviceToSerializationId,
keyWithoutGroup(url.getServiceKey()), k -> new ConcurrentHashSet<>());
+ Collection<String> serializations = UrlUtils.allSerializations(url);
+ for (String serialization : serializations) {
+ Byte id = CodecSupport.getIDByName(serialization);
+ if (id != null) {
+ set.add(id);
+ globalPermittedSerializationIds.add(id);
+ }
+ }
+ }
+
+ public boolean checkSerializationPermitted(String serviceKeyWithoutGroup,
Byte id) throws IOException {
+ Set<Byte> set = serviceToSerializationId.get(serviceKeyWithoutGroup);
+ if (set == null) {
+ throw new IOException("Service " + serviceKeyWithoutGroup + " not
found, invocation rejected.");
+ }
+ return set.contains(id);
+ }
+
+ private static String keyWithoutGroup(String serviceKey) {
+ String interfaceName = interfaceFromServiceKey(serviceKey);
+ String version = versionFromServiceKey(serviceKey);
+ if (StringUtils.isEmpty(version)) {
+ return interfaceName;
+ }
+ return interfaceName + CommonConstants.GROUP_CHAR_SEPARATOR + version;
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolSerializationWrapper.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolSerializationWrapper.java
index 6507d6a84e..dfaaffc827 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolSerializationWrapper.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolSerializationWrapper.java
@@ -43,7 +43,7 @@ public class ProtocolSerializationWrapper implements Protocol
{
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
-
getFrameworkModel(invoker.getUrl().getScopeModel()).getServiceRepository().registerProviderUrl(invoker.getUrl());
+
getFrameworkModel(invoker.getUrl().getScopeModel()).getBeanFactory().getBean(PermittedSerializationKeeper.class).registerService(invoker.getUrl());
return protocol.export(invoker);
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.ScopeModelInitializer
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.ScopeModelInitializer
index 4d3c435bef..3c1f139fa0 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.ScopeModelInitializer
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.ScopeModelInitializer
@@ -1 +1 @@
-injvm=org.apache.dubbo.rpc.listener.ExporterScopeModelInitializer
+rpc=org.apache.dubbo.rpc.RpcScopeModelInitializer
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
index 06b9a5157c..3f2127c0b5 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
@@ -28,6 +28,7 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Codec;
import org.apache.dubbo.remoting.Decodeable;
+import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.transport.CodecSupport;
import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;
@@ -39,6 +40,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ModuleModel;
import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.protocol.PermittedSerializationKeeper;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.io.IOException;
@@ -78,6 +80,8 @@ public class DecodeableRpcInvocation extends RpcInvocation
implements Codec, Dec
protected final transient Supplier<CallbackServiceCodec>
callbackServiceCodecFactory;
+ private static final boolean CHECK_SERIALIZATION =
Boolean.parseBoolean(System.getProperty(SERIALIZATION_SECURITY_CHECK_KEY,
"true"));
+
public DecodeableRpcInvocation(FrameworkModel frameworkModel, Channel
channel, Request request, InputStream is, byte id) {
this.frameworkModel = frameworkModel;
Assert.notNull(channel, "channel == null");
@@ -129,7 +133,8 @@ public class DecodeableRpcInvocation extends RpcInvocation
implements Codec, Dec
setAttachment(VERSION_KEY, version);
// Do provider-level payload checks.
- checkPayload(keyWithoutGroup(path, version));
+ String keyWithoutGroup = keyWithoutGroup(path, version);
+ checkPayload(keyWithoutGroup);
setMethodName(in.readUTF());
@@ -138,8 +143,11 @@ public class DecodeableRpcInvocation extends RpcInvocation
implements Codec, Dec
ClassLoader originClassLoader =
Thread.currentThread().getContextClassLoader();
try {
- if
(Boolean.parseBoolean(System.getProperty(SERIALIZATION_SECURITY_CHECK_KEY,
"true"))) {
-
CodecSupport.checkSerialization(frameworkModel.getServiceRepository(), path,
version, serializationType);
+ if (CHECK_SERIALIZATION) {
+ PermittedSerializationKeeper keeper =
frameworkModel.getBeanFactory().getBean(PermittedSerializationKeeper.class);
+ if (!keeper.checkSerializationPermitted(keyWithoutGroup,
serializationType)) {
+ throw new IOException("Unexpected serialization id:" +
serializationType + " received from network, please check if the peer send the
right id.");
+ }
}
Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
@@ -274,4 +282,8 @@ public class DecodeableRpcInvocation extends RpcInvocation
implements Codec, Dec
}
}
}
+
+ protected void fillInvoker(DubboProtocol dubboProtocol) throws
RemotingException {
+ this.setInvoker(dubboProtocol.getInvoker(channel, this));
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 44bc7cd700..9f3a3351af 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -45,14 +45,13 @@ import java.io.InputStream;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
-import static
org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.BYTE_ACCESSOR_KEY;
+import static
org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_ISOLATION;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_DECODE;
-import static
org.apache.dubbo.rpc.protocol.dubbo.Constants.DECODE_IN_IO_THREAD_KEY;
import static
org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_DECODE_IN_IO_THREAD;
/**
@@ -76,6 +75,7 @@ public class DubboCodec extends ExchangeCodec {
private final CallbackServiceCodec callbackServiceCodec;
private final FrameworkModel frameworkModel;
private final ByteAccessor customByteAccessor;
+ private static final String DECODE_IN_IO_THREAD_KEY =
"decode.in.io.thread";
public DubboCodec(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
@@ -166,9 +166,9 @@ public class DubboCodec extends ExchangeCodec {
DecodeableRpcInvocation inv;
if (isDecodeDataInIoThread(channel)) {
if (customByteAccessor != null) {
- inv = customByteAccessor.getRpcInvocation(channel,
req, is, proto);
+ inv = customByteAccessor.getRpcInvocation(channel,
req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
} else {
- inv = new DecodeableRpcInvocation(frameworkModel,
channel, req, is, proto);
+ inv = new DecodeableRpcInvocation(frameworkModel,
channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
inv.decode();
} else {
@@ -200,17 +200,29 @@ public class DubboCodec extends ExchangeCodec {
}
private boolean isDecodeDataInIoThread(Channel channel) {
- boolean decodeDataInIoThread =
channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY,
DEFAULT_DECODE_IN_IO_THREAD);
+ Object obj = channel.getAttribute(DECODE_IN_IO_THREAD_KEY);
+ if (obj instanceof Boolean) {
+ return (Boolean) obj;
+ }
+
String mode =
ExecutorRepository.getMode(channel.getUrl().getOrDefaultApplicationModel());
- if (EXECUTOR_MANAGEMENT_MODE_ISOLATION.equals(mode)) {
- if (!decodeDataInIoThread &&
decodeInUserThreadLogged.compareAndSet(false, true)) {
- log.info("Because thread pool isolation is enabled on the
dubbo protocol, the body can only be decoded " +
- "on the io thread, and the parameter[" +
DECODE_IN_IO_THREAD_KEY + "] will be ignored");
- // Why? because obtaining the isolated thread pool requires
the serviceKey of the service,
- // and this part must be decoded before it can be obtained
(more see DubboExecutorSupport)
- }
+ boolean isIsolated = EXECUTOR_MANAGEMENT_MODE_ISOLATION.equals(mode);
+
+ if (isIsolated && !decodeInUserThreadLogged.compareAndSet(false,
true)) {
+ channel.setAttribute(DECODE_IN_IO_THREAD_KEY, true);
+ return true;
+ }
+
+ boolean decodeDataInIoThread =
channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY,
DEFAULT_DECODE_IN_IO_THREAD);
+ if (isIsolated && !decodeDataInIoThread) {
+ log.info("Because thread pool isolation is enabled on the dubbo
protocol, the body can only be decoded " +
+ "on the io thread, and the parameter[" +
DECODE_IN_IO_THREAD_KEY + "] will be ignored");
+ // Why? because obtaining the isolated thread pool requires the
serviceKey of the service,
+ // and this part must be decoded before it can be obtained (more
see DubboExecutorSupport)
+ channel.setAttribute(DECODE_IN_IO_THREAD_KEY, true);
return true;
}
+ channel.setAttribute(DECODE_IN_IO_THREAD_KEY, decodeDataInIoThread);
return decodeDataInIoThread;
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupport.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupport.java
index 2beb1e403c..231aa546ca 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupport.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboIsolationExecutorSupport.java
@@ -16,42 +16,57 @@
*/
package org.apache.dubbo.rpc.protocol.dubbo;
-import org.apache.dubbo.common.ServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.executor.AbstractIsolationExecutorSupport;
-
-import java.util.Map;
-
-import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
+import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.model.ServiceModel;
public class DubboIsolationExecutorSupport extends
AbstractIsolationExecutorSupport {
private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(DubboIsolationExecutorSupport.class);
+ private final FrameworkServiceRepository frameworkServiceRepository;
+ private final DubboProtocol dubboProtocol;
+
public DubboIsolationExecutorSupport(URL url) {
super(url);
+ frameworkServiceRepository =
url.getOrDefaultFrameworkModel().getServiceRepository();
+ dubboProtocol =
DubboProtocol.getDubboProtocol(url.getOrDefaultFrameworkModel());
}
@Override
- protected ServiceKey getServiceKey(Object data) {
+ protected ProviderModel getProviderModel(Object data) {
if (!(data instanceof Request)) {
return null;
}
Request request = (Request) data;
- if (!(request.getData() instanceof Invocation)) {
+ if (!(request.getData() instanceof DecodeableRpcInvocation)) {
return null;
}
- Invocation inv = (Invocation) request.getData();
- Map<String, String> attachments = inv.getAttachments();
- String interfaceName = attachments.get(PATH_KEY);
- String version = attachments.get(VERSION_KEY);
- String group = attachments.get(GROUP_KEY);
- return new ServiceKey(interfaceName, version, group);
+
+ try {
+ ((DecodeableRpcInvocation)
request.getData()).fillInvoker(dubboProtocol);
+ } catch (RemotingException e) {
+ // ignore here, and this exception will being rethrow in
DubboProtocol
+ }
+
+ ServiceModel serviceModel = ((Invocation)
request.getData()).getServiceModel();
+ if (serviceModel instanceof ProviderModel) {
+ return (ProviderModel) serviceModel;
+ }
+
+ String targetServiceUniqueName = ((Invocation)
request.getData()).getTargetServiceUniqueName();
+ if (StringUtils.isNotEmpty(targetServiceUniqueName)) {
+ return
frameworkServiceRepository.lookupExportedService(targetServiceUniqueName);
+ }
+
+ return null;
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index eb17a4f1e4..bfd5f701df 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -126,8 +126,7 @@ public class DubboProtocol extends AbstractProtocol {
}
Invocation inv = (Invocation) message;
- Invoker<?> invoker = getInvoker(channel, inv);
- inv.setServiceModel(invoker.getUrl().getServiceModel());
+ Invoker<?> invoker = inv.getInvoker() == null ?
getInvoker(channel, inv) : inv.getInvoker();
// switch TCCL
if (invoker.getUrl().getServiceModel() != null) {
Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());
@@ -293,7 +292,9 @@ public class DubboProtocol extends AbstractProtocol {
", channel: consumer: " + channel.getRemoteAddress() + " -->
provider: " + channel.getLocalAddress() + ", message:" +
getInvocationWithoutData(inv));
}
- return exporter.getInvoker();
+ Invoker<?> invoker = exporter.getInvoker();
+ inv.setServiceModel(invoker.getUrl().getServiceModel());
+ return invoker;
}
public Collection<Invoker<?>> getInvokers() {
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocationTest.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocationTest.java
index da86b50045..f4a39cf5d0 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocationTest.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocationTest.java
@@ -32,6 +32,7 @@ import org.apache.dubbo.remoting.transport.CodecSupport;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.protocol.PermittedSerializationKeeper;
import org.apache.dubbo.rpc.protocol.dubbo.decode.MockChannel;
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
@@ -68,7 +69,8 @@ class DecodeableRpcInvocationTest {
FrameworkModel frameworkModel = new FrameworkModel();
ApplicationModel applicationModel = frameworkModel.newApplication();
applicationModel.getDefaultModule().getServiceRepository().registerService(DemoService.class.getName(),
DemoService.class);
- frameworkModel.getServiceRepository().registerProviderUrl(url);
+
frameworkModel.getBeanFactory().getBean(PermittedSerializationKeeper.class)
+ .registerService(url);
// Simulate the server to decode
Channel channel = new MockChannel();
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java
index cecc45dad9..9f05c81fe2 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/decode/DubboTelnetDecodeTest.java
@@ -35,6 +35,7 @@ import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ModuleServiceRepository;
+import org.apache.dubbo.rpc.protocol.PermittedSerializationKeeper;
import org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation;
import org.apache.dubbo.rpc.protocol.dubbo.DubboCodec;
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
@@ -53,8 +54,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.dubbo.rpc.Constants.SERIALIZATION_SECURITY_CHECK_KEY;
-
/**
* These junit tests aim to test unpack and stick pack of dubbo and telnet
*/
@@ -75,16 +74,11 @@ class DubboTelnetDecodeTest {
public static void setup() {
ModuleServiceRepository serviceRepository =
ApplicationModel.defaultModel().getDefaultModule().getServiceRepository();
serviceRepository.registerService(DemoService.class);
-
- // disable
org.apache.dubbo.remoting.transport.CodecSupport.checkSerialization to avoid
error:
- // java.io.IOException: Service
org.apache.dubbo.rpc.protocol.dubbo.support.DemoService with version 0.0.0 not
found, invocation rejected.
- System.setProperty(SERIALIZATION_SECURITY_CHECK_KEY, "false");
}
@AfterAll
public static void teardown() {
FrameworkModel.defaultModel().destroy();
- System.clearProperty(SERIALIZATION_SECURITY_CHECK_KEY);
}
@@ -481,6 +475,9 @@ class DubboTelnetDecodeTest {
// register
// frameworkModel.getServiceRepository().registerProviderUrl();
+
FrameworkModel.defaultModel().getBeanFactory().getBean(PermittedSerializationKeeper.class)
+ .registerService(URL.valueOf("dubbo://127.0.0.1:20880/" +
DemoService.class.getName() + "?version=0.0.0"));
+
return dubboByteBuf;
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelHandlersTest.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/ChannelHandlersTest.java
similarity index 93%
rename from
dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelHandlersTest.java
rename to
dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/ChannelHandlersTest.java
index 8c7ccbf724..4876dd7b85 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelHandlersTest.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/ChannelHandlersTest.java
@@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.transport.dispatcher;
+package org.apache.dubbo.rpc.protocol.dubbo.managemode;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.transport.MultiMessageHandler;
-
+import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/ConnectChannelHandlerTest.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/ConnectChannelHandlerTest.java
similarity index 99%
rename from
dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/ConnectChannelHandlerTest.java
rename to
dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/ConnectChannelHandlerTest.java
index c131b8bbe7..352a8c2738 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/ConnectChannelHandlerTest.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/ConnectChannelHandlerTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.handler;
+package org.apache.dubbo.rpc.protocol.dubbo.managemode;
import org.apache.dubbo.remoting.ExecutionException;
import org.apache.dubbo.remoting.RemotingException;
@@ -22,7 +22,6 @@ import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import
org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedChannelHandler;
import org.apache.dubbo.rpc.model.ApplicationModel;
-
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -31,7 +30,6 @@ import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
-
class ConnectChannelHandlerTest extends WrappedChannelHandlerTest {
@BeforeEach
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/MockedChannel.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/MockedChannel.java
new file mode 100644
index 0000000000..baa35eac69
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/MockedChannel.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.managemode;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockedChannel implements Channel {
+ private boolean isClosed;
+ private volatile boolean closing = false;
+ private URL url;
+ private ChannelHandler handler;
+ private Map<String, Object> map = new HashMap<String, Object>();
+
+ public MockedChannel() {
+ super();
+ }
+
+
+ @Override
+ public URL getUrl() {
+ return url;
+ }
+
+ @Override
+ public ChannelHandler getChannelHandler() {
+
+ return this.handler;
+ }
+
+ @Override
+ public InetSocketAddress getLocalAddress() {
+
+ return null;
+ }
+
+ @Override
+ public void send(Object message) throws RemotingException {
+ }
+
+ @Override
+ public void send(Object message, boolean sent) throws RemotingException {
+ this.send(message);
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ @Override
+ public void close(int timeout) {
+ this.close();
+ }
+
+ @Override
+ public void startClose() {
+ closing = true;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ return null;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return false;
+ }
+
+ @Override
+ public boolean hasAttribute(String key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public Object getAttribute(String key) {
+ return map.get(key);
+ }
+
+ @Override
+ public void setAttribute(String key, Object value) {
+ map.put(key, value);
+ }
+
+ @Override
+ public void removeAttribute(String key) {
+ map.remove(key);
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/MockedChannelHandler.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/MockedChannelHandler.java
new file mode 100644
index 0000000000..f2dc66abba
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/MockedChannelHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.managemode;
+
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class MockedChannelHandler implements ChannelHandler {
+ // ConcurrentMap<String, Channel> channels = new
ConcurrentHashMap<String, Channel>();
+ ConcurrentHashSet<Channel> channels = new ConcurrentHashSet<Channel>();
+
+ @Override
+ public void connected(Channel channel) throws RemotingException {
+ channels.add(channel);
+ }
+
+ @Override
+ public void disconnected(Channel channel) throws RemotingException {
+ channels.remove(channel);
+ }
+
+ @Override
+ public void sent(Channel channel, Object message) throws RemotingException
{
+ channel.send(message);
+ }
+
+ @Override
+ public void received(Channel channel, Object message) throws
RemotingException {
+ //echo
+ channel.send(message);
+ }
+
+ @Override
+ public void caught(Channel channel, Throwable exception) throws
RemotingException {
+ throw new RemotingException(channel, exception);
+
+ }
+
+ public Set<Channel> getChannels() {
+ return Collections.unmodifiableSet(channels);
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/WrappedChannelHandlerTest.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/WrappedChannelHandlerTest.java
similarity index 98%
rename from
dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/WrappedChannelHandlerTest.java
rename to
dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/WrappedChannelHandlerTest.java
index 4ef2f5203a..e0e567d9b8 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/WrappedChannelHandlerTest.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/managemode/WrappedChannelHandlerTest.java
@@ -14,8 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.handler;
-
+package org.apache.dubbo.rpc.protocol.dubbo.managemode;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -28,7 +27,6 @@ import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;
import org.apache.dubbo.rpc.model.ApplicationModel;
-
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -40,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.fail;
class WrappedChannelHandlerTest {
WrappedChannelHandler handler;
- URL url = URL.valueOf("test://10.20.30.40:1234");
+ URL url = URL.valueOf("dubbo://10.20.30.40:1234");
@BeforeEach
public void setUp() throws Exception {