This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 94a50e5d4e Set isolated executor for internal services (#12067)
94a50e5d4e is described below
commit 94a50e5d4e7f937ac3df4b12cdaa9bb1c45593ca
Author: Albumen Kevin <[email protected]>
AuthorDate: Tue Apr 11 21:29:30 2023 +0800
Set isolated executor for internal services (#12067)
---
.../dubbo/common/constants/CommonConstants.java | 4 +---
.../manager/DefaultExecutorRepository.java | 18 ++----------------
.../manager/FrameworkExecutorRepository.java | 11 +++++++++++
.../builders/InternalServiceConfigBuilder.java | 21 +++++++++------------
.../deploy/DefaultMetricsServiceExporter.java | 9 +++++++--
.../ConfigurableMetadataServiceExporter.java | 5 +++++
6 files changed, 35 insertions(+), 33 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index c8f1ab7a9e..290a2a8747 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -118,8 +118,6 @@ public interface CommonConstants {
String CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY =
"CONSUMER_SHARED_SERVICE_EXECUTOR";
- String INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY =
"INTERNAL_SERVICE_EXECUTOR";
-
String THREADPOOL_KEY = "threadpool";
String THREAD_NAME_KEY = "threadname";
@@ -626,6 +624,6 @@ public interface CommonConstants {
String BYTE_ACCESSOR_KEY = "byte.accessor";
String PAYLOAD = "payload";
-
+
String DUBBO_METRICS_CONFIGCENTER_ENABLE =
"dubbo.metrics.configcenter.enable";
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index c792ca07c3..ae1c9aaacc 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -36,7 +36,6 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.ModuleModel;
import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.ServiceModel;
import java.util.Map;
@@ -53,7 +52,6 @@ import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_EXPORT_T
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_PROTOCOL;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_REFER_THREAD_NUM;
import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
-import static
org.apache.dubbo.common.constants.CommonConstants.INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
@@ -171,25 +169,13 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
* @return
*/
private String getExecutorKey(URL url) {
- String executorKey = INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
- ServiceDescriptor serviceDescriptor =
applicationModel.getInternalModule().getServiceRepository().lookupService(url.getServiceInterface());
- // if not found in internal service repository, then it's biz service
defined by user.
- if (serviceDescriptor == null) {
- executorKey = EXECUTOR_SERVICE_COMPONENT_KEY;
-
- }
-
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
- executorKey = CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
+ return CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
}
- return executorKey;
+ return EXECUTOR_SERVICE_COMPONENT_KEY;
}
private String getExecutorKey(ServiceModel serviceModel) {
- if (serviceModel.getModuleModel().isInternal()) {
- return INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
- }
-
if (serviceModel instanceof ProviderModel) {
return EXECUTOR_SERVICE_COMPONENT_KEY;
} else {
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/FrameworkExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/FrameworkExecutorRepository.java
index ce70b83fa4..f0cf0bc618 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/FrameworkExecutorRepository.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/FrameworkExecutorRepository.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -56,6 +57,8 @@ public class FrameworkExecutorRepository implements
Disposable {
private final Ring<ExecutorService> executorServiceRing = new Ring<>();
+ private final ExecutorService internalServiceExecutor;
+
public FrameworkExecutorRepository() {
sharedExecutor = Executors.newCachedThreadPool(new
NamedThreadFactory("Dubbo-framework-shared-handler", true));
sharedScheduledExecutor = Executors.newScheduledThreadPool(8, new
NamedThreadFactory("Dubbo-framework-shared-scheduler", true));
@@ -89,6 +92,9 @@ public class FrameworkExecutorRepository implements
Disposable {
}
metadataRetryExecutor = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("Dubbo-framework-metadata-retry"));
+ internalServiceExecutor = new ThreadPoolExecutor(0, 100, 60L,
TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new
NamedInternalThreadFactory("Dubbo-internal-service", true),
+ new ThreadPoolExecutor.AbortPolicy());
}
/**
@@ -122,6 +128,10 @@ public class FrameworkExecutorRepository implements
Disposable {
return metadataRetryExecutor;
}
+ public ExecutorService getInternalServiceExecutor() {
+ return internalServiceExecutor;
+ }
+
/**
* Get the default shared thread pool.
*
@@ -177,6 +187,7 @@ public class FrameworkExecutorRepository implements
Disposable {
logger.info("destroying framework executor repository ..");
shutdownExecutorService(poolRouterExecutor, "poolRouterExecutor");
shutdownExecutorService(metadataRetryExecutor,
"metadataRetryExecutor");
+ shutdownExecutorService(internalServiceExecutor,
"internalServiceExecutor");
// scheduledExecutors
shutdownExecutorServices(scheduledExecutors.listItems(),
"scheduledExecutors");
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/InternalServiceConfigBuilder.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/InternalServiceConfigBuilder.java
index a0b27cdbf8..6e37707f28 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/InternalServiceConfigBuilder.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/InternalServiceConfigBuilder.java
@@ -34,20 +34,17 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.dubbo.common.constants.CommonConstants.APPLICATION_PROTOCOL_KEY;
-import static
org.apache.dubbo.common.constants.CommonConstants.CORE_THREADS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
-import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;
import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
@@ -61,6 +58,7 @@ public class InternalServiceConfigBuilder<T> {
private Integer port;
private String registryId;
private Class<T> interfaceClass;
+ private Executor executor;
private T ref;
private InternalServiceConfigBuilder(ApplicationModel applicationModel) {
@@ -78,6 +76,11 @@ public class InternalServiceConfigBuilder<T> {
return getThis();
}
+ public InternalServiceConfigBuilder<T> executor(Executor executor) {
+ this.executor = executor;
+ return getThis();
+ }
+
public InternalServiceConfigBuilder<T> ref(T ref) {
this.ref = ref;
return getThis();
@@ -87,7 +90,7 @@ public class InternalServiceConfigBuilder<T> {
this.registryId = registryId;
return getThis();
}
-
+
public InternalServiceConfigBuilder<T> protocol(String protocol, String
key) {
if (StringUtils.isEmpty(protocol) && StringUtils.isNotBlank(key)) {
Map<String, String> params =
getApplicationConfig().getParameters();
@@ -273,13 +276,7 @@ public class InternalServiceConfigBuilder<T> {
serviceConfig.setGroup(applicationConfig.getName());
serviceConfig.setVersion("1.0.0");
- serviceConfig.setExecutes(100); // max tasks running at the same time
- Map<String, String> params = new HashMap<>();
- params.put(THREADPOOL_KEY, "cached");
- params.put(THREADS_KEY, "100");
- params.put(CORE_THREADS_KEY, "2");
-
- serviceConfig.setParameters(params);
+ serviceConfig.setExecutor(executor);
if (null != configConsumer) {
configConsumer.accept(serviceConfig);
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultMetricsServiceExporter.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultMetricsServiceExporter.java
index 0efaaff71a..c904ed39c7 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultMetricsServiceExporter.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultMetricsServiceExporter.java
@@ -19,15 +19,17 @@ package org.apache.dubbo.config.deploy;
import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.metrics.service.MetricsService;
-import org.apache.dubbo.metrics.service.MetricsServiceExporter;
+import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.config.MetricsConfig;
import org.apache.dubbo.config.ServiceConfig;
import org.apache.dubbo.config.bootstrap.builders.InternalServiceConfigBuilder;
+import org.apache.dubbo.metrics.service.MetricsService;
+import org.apache.dubbo.metrics.service.MetricsServiceExporter;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_METRICS_COLLECTOR_EXCEPTION;
import static
org.apache.dubbo.common.constants.MetricsConstants.PROTOCOL_PROMETHEUS;
@@ -70,10 +72,13 @@ public class DefaultMetricsServiceExporter implements
MetricsServiceExporter, Sc
public MetricsServiceExporter export() {
if (metricsService != null) {
if (!isExported()) {
+ ExecutorService internalServiceExecutor =
applicationModel.getFrameworkModel().getBeanFactory()
+
.getBean(FrameworkExecutorRepository.class).getInternalServiceExecutor();
ServiceConfig<MetricsService> serviceConfig =
InternalServiceConfigBuilder.<MetricsService>newBuilder(applicationModel)
.interfaceClass(MetricsService.class)
.protocol(getMetricsConfig().getExportServiceProtocol())
.port(getMetricsConfig().getExportServicePort())
+ .executor(internalServiceExecutor)
.ref(metricsService)
.registryId("internal-metrics-registry")
.build();
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
index aceaaf057b..dc62bc803e 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.config.metadata;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ArgumentConfig;
import org.apache.dubbo.config.MethodConfig;
@@ -30,6 +31,7 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import static java.util.Collections.emptyList;
import static
org.apache.dubbo.common.constants.CommonConstants.METADATA_SERVICE_PORT_KEY;
@@ -55,11 +57,14 @@ public class ConfigurableMetadataServiceExporter {
public synchronized ConfigurableMetadataServiceExporter export() {
if (serviceConfig == null || !isExported()) {
+ ExecutorService internalServiceExecutor =
applicationModel.getFrameworkModel().getBeanFactory()
+
.getBean(FrameworkExecutorRepository.class).getInternalServiceExecutor();
this.serviceConfig =
InternalServiceConfigBuilder.<MetadataService>newBuilder(applicationModel)
.interfaceClass(MetadataService.class)
.protocol(getApplicationConfig().getMetadataServiceProtocol(),
METADATA_SERVICE_PROTOCOL_KEY)
.port(getApplicationConfig().getMetadataServicePort(),
METADATA_SERVICE_PORT_KEY)
.registryId("internal-metadata-registry")
+ .executor(internalServiceExecutor)
.ref(metadataService)
.build(configConsumer ->
configConsumer.setMethods(generateMethodConfig()));