This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 94a50e5d4e Set isolated executor for internal services (#12067)
94a50e5d4e is described below

commit 94a50e5d4e7f937ac3df4b12cdaa9bb1c45593ca
Author: Albumen Kevin <[email protected]>
AuthorDate: Tue Apr 11 21:29:30 2023 +0800

    Set isolated executor for internal services (#12067)
---
 .../dubbo/common/constants/CommonConstants.java     |  4 +---
 .../manager/DefaultExecutorRepository.java          | 18 ++----------------
 .../manager/FrameworkExecutorRepository.java        | 11 +++++++++++
 .../builders/InternalServiceConfigBuilder.java      | 21 +++++++++------------
 .../deploy/DefaultMetricsServiceExporter.java       |  9 +++++++--
 .../ConfigurableMetadataServiceExporter.java        |  5 +++++
 6 files changed, 35 insertions(+), 33 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index c8f1ab7a9e..290a2a8747 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -118,8 +118,6 @@ public interface CommonConstants {
 
     String CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY = 
"CONSUMER_SHARED_SERVICE_EXECUTOR";
 
-    String INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY = 
"INTERNAL_SERVICE_EXECUTOR";
-
     String THREADPOOL_KEY = "threadpool";
 
     String THREAD_NAME_KEY = "threadname";
@@ -626,6 +624,6 @@ public interface CommonConstants {
     String BYTE_ACCESSOR_KEY = "byte.accessor";
 
     String PAYLOAD = "payload";
-    
+
     String DUBBO_METRICS_CONFIGCENTER_ENABLE = 
"dubbo.metrics.configcenter.enable";
 }
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index c792ca07c3..ae1c9aaacc 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -36,7 +36,6 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ConsumerModel;
 import org.apache.dubbo.rpc.model.ModuleModel;
 import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.model.ServiceDescriptor;
 import org.apache.dubbo.rpc.model.ServiceModel;
 
 import java.util.Map;
@@ -53,7 +52,6 @@ import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_EXPORT_T
 import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_PROTOCOL;
 import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_REFER_THREAD_NUM;
 import static 
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
-import static 
org.apache.dubbo.common.constants.CommonConstants.INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
 import static 
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
@@ -171,25 +169,13 @@ public class DefaultExecutorRepository implements 
ExecutorRepository, ExtensionA
      * @return
      */
     private String getExecutorKey(URL url) {
-        String executorKey = INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
-        ServiceDescriptor serviceDescriptor = 
applicationModel.getInternalModule().getServiceRepository().lookupService(url.getServiceInterface());
-        // if not found in internal service repository, then it's biz service 
defined by user.
-        if (serviceDescriptor == null) {
-            executorKey = EXECUTOR_SERVICE_COMPONENT_KEY;
-
-        }
-
         if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
-            executorKey = CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
+            return CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
         }
-        return executorKey;
+        return EXECUTOR_SERVICE_COMPONENT_KEY;
     }
 
     private String getExecutorKey(ServiceModel serviceModel) {
-        if (serviceModel.getModuleModel().isInternal()) {
-            return INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
-        }
-
         if (serviceModel instanceof ProviderModel) {
             return EXECUTOR_SERVICE_COMPONENT_KEY;
         } else {
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/FrameworkExecutorRepository.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/FrameworkExecutorRepository.java
index ce70b83fa4..f0cf0bc618 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/FrameworkExecutorRepository.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/FrameworkExecutorRepository.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -56,6 +57,8 @@ public class FrameworkExecutorRepository implements 
Disposable {
 
     private final Ring<ExecutorService> executorServiceRing = new Ring<>();
 
+    private final ExecutorService internalServiceExecutor;
+
     public FrameworkExecutorRepository() {
         sharedExecutor = Executors.newCachedThreadPool(new 
NamedThreadFactory("Dubbo-framework-shared-handler", true));
         sharedScheduledExecutor = Executors.newScheduledThreadPool(8, new 
NamedThreadFactory("Dubbo-framework-shared-scheduler", true));
@@ -89,6 +92,9 @@ public class FrameworkExecutorRepository implements 
Disposable {
         }
 
         metadataRetryExecutor = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("Dubbo-framework-metadata-retry"));
+        internalServiceExecutor = new ThreadPoolExecutor(0, 100, 60L, 
TimeUnit.SECONDS,
+            new SynchronousQueue<>(), new 
NamedInternalThreadFactory("Dubbo-internal-service", true),
+            new ThreadPoolExecutor.AbortPolicy());
     }
 
     /**
@@ -122,6 +128,10 @@ public class FrameworkExecutorRepository implements 
Disposable {
         return metadataRetryExecutor;
     }
 
+    public ExecutorService getInternalServiceExecutor() {
+        return internalServiceExecutor;
+    }
+
     /**
      * Get the default shared thread pool.
      *
@@ -177,6 +187,7 @@ public class FrameworkExecutorRepository implements 
Disposable {
         logger.info("destroying framework executor repository ..");
         shutdownExecutorService(poolRouterExecutor, "poolRouterExecutor");
         shutdownExecutorService(metadataRetryExecutor, 
"metadataRetryExecutor");
+        shutdownExecutorService(internalServiceExecutor, 
"internalServiceExecutor");
 
         // scheduledExecutors
         shutdownExecutorServices(scheduledExecutors.listItems(), 
"scheduledExecutors");
diff --git 
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/InternalServiceConfigBuilder.java
 
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/InternalServiceConfigBuilder.java
index a0b27cdbf8..6e37707f28 100644
--- 
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/InternalServiceConfigBuilder.java
+++ 
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/builders/InternalServiceConfigBuilder.java
@@ -34,20 +34,17 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ModuleModel;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.dubbo.common.constants.CommonConstants.APPLICATION_PROTOCOL_KEY;
-import static 
org.apache.dubbo.common.constants.CommonConstants.CORE_THREADS_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
-import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;
 import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
 
@@ -61,6 +58,7 @@ public class InternalServiceConfigBuilder<T> {
     private Integer port;
     private String registryId;
     private Class<T> interfaceClass;
+    private Executor executor;
     private T   ref;
 
     private InternalServiceConfigBuilder(ApplicationModel applicationModel) {
@@ -78,6 +76,11 @@ public class InternalServiceConfigBuilder<T> {
         return getThis();
     }
 
+    public InternalServiceConfigBuilder<T> executor(Executor executor) {
+        this.executor = executor;
+        return getThis();
+    }
+
     public InternalServiceConfigBuilder<T> ref(T ref) {
         this.ref = ref;
         return getThis();
@@ -87,7 +90,7 @@ public class InternalServiceConfigBuilder<T> {
         this.registryId = registryId;
         return getThis();
     }
-    
+
     public InternalServiceConfigBuilder<T> protocol(String protocol, String 
key) {
         if (StringUtils.isEmpty(protocol) && StringUtils.isNotBlank(key)) {
             Map<String, String> params = 
getApplicationConfig().getParameters();
@@ -273,13 +276,7 @@ public class InternalServiceConfigBuilder<T> {
         serviceConfig.setGroup(applicationConfig.getName());
         serviceConfig.setVersion("1.0.0");
 
-        serviceConfig.setExecutes(100); // max tasks running at the same time
-        Map<String, String> params = new HashMap<>();
-        params.put(THREADPOOL_KEY, "cached");
-        params.put(THREADS_KEY, "100");
-        params.put(CORE_THREADS_KEY, "2");
-
-        serviceConfig.setParameters(params);
+        serviceConfig.setExecutor(executor);
 
         if (null != configConsumer) {
             configConsumer.accept(serviceConfig);
diff --git 
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultMetricsServiceExporter.java
 
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultMetricsServiceExporter.java
index 0efaaff71a..c904ed39c7 100644
--- 
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultMetricsServiceExporter.java
+++ 
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultMetricsServiceExporter.java
@@ -19,15 +19,17 @@ package org.apache.dubbo.config.deploy;
 import org.apache.dubbo.common.constants.LoggerCodeConstants;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.metrics.service.MetricsService;
-import org.apache.dubbo.metrics.service.MetricsServiceExporter;
+import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
 import org.apache.dubbo.config.MetricsConfig;
 import org.apache.dubbo.config.ServiceConfig;
 import org.apache.dubbo.config.bootstrap.builders.InternalServiceConfigBuilder;
+import org.apache.dubbo.metrics.service.MetricsService;
+import org.apache.dubbo.metrics.service.MetricsServiceExporter;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ScopeModelAware;
 
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_METRICS_COLLECTOR_EXCEPTION;
 import static 
org.apache.dubbo.common.constants.MetricsConstants.PROTOCOL_PROMETHEUS;
@@ -70,10 +72,13 @@ public class DefaultMetricsServiceExporter implements 
MetricsServiceExporter, Sc
     public MetricsServiceExporter export() {
         if (metricsService != null) {
             if (!isExported()) {
+                ExecutorService internalServiceExecutor = 
applicationModel.getFrameworkModel().getBeanFactory()
+                    
.getBean(FrameworkExecutorRepository.class).getInternalServiceExecutor();
                 ServiceConfig<MetricsService> serviceConfig = 
InternalServiceConfigBuilder.<MetricsService>newBuilder(applicationModel)
                     .interfaceClass(MetricsService.class)
                     .protocol(getMetricsConfig().getExportServiceProtocol())
                     .port(getMetricsConfig().getExportServicePort())
+                    .executor(internalServiceExecutor)
                     .ref(metricsService)
                     .registryId("internal-metrics-registry")
                     .build();
diff --git 
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
 
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
index aceaaf057b..dc62bc803e 100644
--- 
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
+++ 
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.config.metadata;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
 import org.apache.dubbo.config.ApplicationConfig;
 import org.apache.dubbo.config.ArgumentConfig;
 import org.apache.dubbo.config.MethodConfig;
@@ -30,6 +31,7 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import static java.util.Collections.emptyList;
 import static 
org.apache.dubbo.common.constants.CommonConstants.METADATA_SERVICE_PORT_KEY;
@@ -55,11 +57,14 @@ public class ConfigurableMetadataServiceExporter {
 
     public synchronized ConfigurableMetadataServiceExporter export() {
         if (serviceConfig == null || !isExported()) {
+            ExecutorService internalServiceExecutor = 
applicationModel.getFrameworkModel().getBeanFactory()
+                
.getBean(FrameworkExecutorRepository.class).getInternalServiceExecutor();
             this.serviceConfig = 
InternalServiceConfigBuilder.<MetadataService>newBuilder(applicationModel)
                 .interfaceClass(MetadataService.class)
                 .protocol(getApplicationConfig().getMetadataServiceProtocol(), 
METADATA_SERVICE_PROTOCOL_KEY)
                 .port(getApplicationConfig().getMetadataServicePort(), 
METADATA_SERVICE_PORT_KEY)
                 .registryId("internal-metadata-registry")
+                .executor(internalServiceExecutor)
                 .ref(metadataService)
                 .build(configConsumer -> 
configConsumer.setMethods(generateMethodConfig()));
 

Reply via email to