This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0246327083 Polish config, expose rpc config in application.yml (#14501)
0246327083 is described below
commit 0246327083ab3d52f97051ac4bbabb6847ec6a10
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Jul 11 09:29:31 2023 +0800
Polish config, expose rpc config in application.yml (#14501)
---
docs/docs/en/architecture/configuration.md | 10 +--
docs/docs/zh/architecture/configuration.md | 10 +--
.../api/audit/AuditPublishService.java | 10 +--
.../api/configuration/ApiConfig.java | 90 ++++++++++++++++++++++
.../api/configuration/AppConfiguration.java | 7 +-
.../api/configuration/AuditConfiguration.java | 38 ---------
.../configuration/PythonGatewayConfiguration.java | 38 ---------
.../api/configuration/TrafficConfiguration.java | 38 ---------
.../api/interceptor/RateLimitInterceptor.java | 6 +-
.../dolphinscheduler/api/python/PythonGateway.java | 7 +-
.../src/main/resources/application.yaml | 76 +++++++++---------
...icConfigurationTest.java => ApiConfigTest.java} | 32 ++++----
.../api/configuration/AuditConfigurationTest.java | 37 ---------
.../api/interceptor/RateLimitInterceptorTest.java | 8 +-
.../src/test/resources/application.yaml | 38 ++++++++-
.../server/master/MasterServer.java | 6 ++
.../server/master/config/MasterConfig.java | 8 ++
.../server/master/rpc/MasterRPCServer.java | 2 +-
.../server/master/rpc/MasterRpcClient.java | 25 ++++--
.../master/runner/WorkflowExecuteRunnable.java | 33 +++-----
.../src/main/resources/application.yaml | 61 +++++++++------
.../server/worker/config/WorkerConfig.java | 5 ++
.../server/worker/rpc/WorkerRpcClient.java | 9 ++-
.../server/worker/rpc/WorkerRpcServer.java | 2 +-
24 files changed, 298 insertions(+), 298 deletions(-)
diff --git a/docs/docs/en/architecture/configuration.md
b/docs/docs/en/architecture/configuration.md
index 05d9e99ebc..acbd4c87b0 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -259,11 +259,11 @@ Location: `api-server/conf/application.yaml`
|security.authentication.ldap.ssl.enable|false|LDAP switch|
|security.authentication.ldap.ssl.trust-store|ldapkeystore.jks|LDAP jks file
absolute path|
|security.authentication.ldap.ssl.trust-store-password|password|LDAP jks
password|
-|traffic.control.global.switch|false|traffic control global switch|
-|traffic.control.max-global-qps-rate|300|global max request number per second|
-|traffic.control.tenant-switch|false|traffic control tenant switch|
-|traffic.control.default-tenant-qps-rate|10|default tenant max request number
per second|
-|traffic.control.customize-tenant-qps-rate||customize tenant max request
number per second|
+|api.traffic.control.global.switch|false|traffic control global switch|
+|api.traffic.control.max-global-qps-rate|300|global max request number per
second|
+|api.traffic.control.tenant-switch|false|traffic control tenant switch|
+|api.traffic.control.default-tenant-qps-rate|10|default tenant max request
number per second|
+|api.traffic.control.customize-tenant-qps-rate||customize tenant max request
number per second|
### Master Server related configuration
diff --git a/docs/docs/zh/architecture/configuration.md
b/docs/docs/zh/architecture/configuration.md
index 3d91a8797d..024a037ce0 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -257,11 +257,11 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
|security.authentication.ldap.ssl.enable|false|LDAP ssl开关|
|security.authentication.ldap.ssl.trust-store|ldapkeystore.jks|LDAP jks文件绝对路径|
|security.authentication.ldap.ssl.trust-store-password|password|LDAP jks密码|
-|traffic.control.global.switch|false|流量控制全局开关|
-|traffic.control.max-global-qps-rate|300|全局最大请求数/秒|
-|traffic.control.tenant-switch|false|流量控制租户开关|
-|traffic.control.default-tenant-qps-rate|10|默认租户最大请求数/秒限制|
-|traffic.control.customize-tenant-qps-rate||自定义租户最大请求数/秒限制|
+|api.traffic.control.global.switch|false|流量控制全局开关|
+|api.traffic.control.max-global-qps-rate|300|全局最大请求数/秒|
+|api.traffic.control.tenant-switch|false|流量控制租户开关|
+|api.traffic.control.default-tenant-qps-rate|10|默认租户最大请求数/秒限制|
+|api.traffic.control.customize-tenant-qps-rate||自定义租户最大请求数/秒限制|
## Master Server相关配置
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java
index 6e47f87145..34a5cd8ac0 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.audit;
-import org.apache.dolphinscheduler.api.configuration.AuditConfiguration;
+import org.apache.dolphinscheduler.api.configuration.ApiConfig;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -34,20 +34,20 @@ import org.springframework.stereotype.Component;
@Slf4j
public class AuditPublishService {
- private BlockingQueue<AuditMessage> auditMessageQueue = new
LinkedBlockingQueue<>();
+ private final BlockingQueue<AuditMessage> auditMessageQueue = new
LinkedBlockingQueue<>();
@Autowired
private List<AuditSubscriber> subscribers;
@Autowired
- private AuditConfiguration auditConfiguration;
+ private ApiConfig apiConfig;
/**
* create a daemon thread to process the message queue
*/
@PostConstruct
private void init() {
- if (auditConfiguration.getEnabled()) {
+ if (apiConfig.isAuditEnable()) {
Thread thread = new Thread(this::doPublish);
thread.setDaemon(true);
thread.setName("Audit-Log-Consume-Thread");
@@ -61,7 +61,7 @@ public class AuditPublishService {
* @param message audit message
*/
public void publish(AuditMessage message) {
- if (auditConfiguration.getEnabled() &&
!auditMessageQueue.offer(message)) {
+ if (apiConfig.isAuditEnable() && !auditMessageQueue.offer(message)) {
log.error("Publish audit message failed, message:{}", message);
}
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ApiConfig.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ApiConfig.java
new file mode 100644
index 0000000000..3a33ad2d58
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ApiConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.validation.Errors;
+import org.springframework.validation.Validator;
+import org.springframework.validation.annotation.Validated;
+
+@Slf4j
+@Data
+@Validated
+@Configuration
+@ConfigurationProperties(value = "api")
+public class ApiConfig implements Validator {
+
+ private boolean auditEnable = false;
+
+ private TrafficConfiguration trafficControl = new TrafficConfiguration();
+
+ private PythonGatewayConfiguration pythonGateway = new
PythonGatewayConfiguration();
+
+ @Override
+ public boolean supports(Class<?> clazz) {
+ return ApiConfig.class.isAssignableFrom(clazz);
+ }
+
+ @Override
+ public void validate(Object target, Errors errors) {
+ printConfig();
+ }
+
+ private void printConfig() {
+ log.info("API config: auditEnable -> {} ", auditEnable);
+ log.info("API config: trafficControl -> {} ", trafficControl);
+ log.info("API config: pythonGateway -> {} ", pythonGateway);
+ }
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class TrafficConfiguration {
+
+ private boolean globalSwitch = false;
+ private Integer maxGlobalQpsRate = 300;
+ private boolean tenantSwitch = false;
+ private Integer defaultTenantQpsRate = 10;
+ private Map<String, Integer> customizeTenantQpsRate = new HashMap<>();
+ }
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class PythonGatewayConfiguration {
+
+ private boolean enabled = true;
+ private String gatewayServerAddress = "0.0.0.0";
+ private int gatewayServerPort = 25333;
+ private String pythonAddress = "127.0.0.1";
+ private int pythonPort = 25334;
+ private int connectTimeout = 0;
+ private int readTimeout = 0;
+ private String authToken = "jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc";
+ }
+
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java
index 2d6b5f7bdd..9fde9eec17 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java
@@ -50,7 +50,7 @@ public class AppConfiguration implements WebMvcConfigurer {
public static final String LOCALE_LANGUAGE_COOKIE = "language";
@Autowired
- private TrafficConfiguration trafficConfiguration;
+ private ApiConfig apiConfig;
@Bean
public CorsFilter corsFilter() {
@@ -90,14 +90,15 @@ public class AppConfiguration implements WebMvcConfigurer {
@Bean
public RateLimitInterceptor createRateLimitInterceptor() {
- return new RateLimitInterceptor(trafficConfiguration);
+ return new RateLimitInterceptor(apiConfig.getTrafficControl());
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
// i18n
registry.addInterceptor(localeChangeInterceptor());
- if (trafficConfiguration.isGlobalSwitch() ||
trafficConfiguration.isTenantSwitch()) {
+ ApiConfig.TrafficConfiguration trafficControl =
apiConfig.getTrafficControl();
+ if (trafficControl.isGlobalSwitch() ||
trafficControl.isTenantSwitch()) {
registry.addInterceptor(createRateLimitInterceptor());
}
registry.addInterceptor(loginInterceptor())
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AuditConfiguration.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AuditConfiguration.java
deleted file mode 100644
index 451fcbc40b..0000000000
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AuditConfiguration.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.configuration;
-
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import
org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.stereotype.Component;
-
-@Component
-@EnableConfigurationProperties
-@ConfigurationProperties(value = "audit", ignoreUnknownFields = false)
-public class AuditConfiguration {
-
- private boolean enabled;
-
- public boolean getEnabled() {
- return enabled;
- }
-
- public void setEnabled(boolean enabled) {
- this.enabled = enabled;
- }
-}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java
deleted file mode 100644
index a3f5f0dad7..0000000000
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.configuration;
-
-import lombok.Data;
-
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
-@Data
-@Configuration
-@ConfigurationProperties(value = "python-gateway")
-public class PythonGatewayConfiguration {
-
- private boolean enabled;
- private String gatewayServerAddress;
- private int gatewayServerPort;
- private String pythonAddress;
- private int pythonPort;
- private int connectTimeout;
- private int readTimeout;
- private String authToken;
-}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java
deleted file mode 100644
index 47cb5230f3..0000000000
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.configuration;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import lombok.Data;
-
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
-@Data
-@Configuration
-@ConfigurationProperties(prefix = "traffic.control")
-public class TrafficConfiguration {
-
- private boolean globalSwitch;
- private Integer maxGlobalQpsRate = 300;
- private boolean tenantSwitch;
- private Integer defaultTenantQpsRate = 10;
- private Map<String, Integer> customizeTenantQpsRate = new HashMap<>();
-}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java
index 22dcafcbdc..1f0ec80ac8 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.interceptor;
-import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration;
+import org.apache.dolphinscheduler.api.configuration.ApiConfig;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -47,7 +47,7 @@ import com.google.common.util.concurrent.RateLimiter;
@Slf4j
public class RateLimitInterceptor implements HandlerInterceptor {
- private TrafficConfiguration trafficConfiguration;
+ private ApiConfig.TrafficConfiguration trafficConfiguration;
private RateLimiter globalRateLimiter;
@@ -98,7 +98,7 @@ public class RateLimitInterceptor implements
HandlerInterceptor {
return true;
}
- public RateLimitInterceptor(TrafficConfiguration trafficConfiguration) {
+ public RateLimitInterceptor(ApiConfig.TrafficConfiguration
trafficConfiguration) {
this.trafficConfiguration = trafficConfiguration;
if (trafficConfiguration.isGlobalSwitch()) {
this.globalRateLimiter =
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index fe57f12d16..2451a9de6e 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.python;
-import
org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration;
+import org.apache.dolphinscheduler.api.configuration.ApiConfig;
import org.apache.dolphinscheduler.api.dto.EnvironmentDto;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.api.enums.Status;
@@ -145,7 +145,7 @@ public class PythonGateway {
private DataSourceMapper dataSourceMapper;
@Autowired
- private PythonGatewayConfiguration pythonGatewayConfiguration;
+ private ApiConfig apiConfig;
@Autowired
private ProjectUserMapper projectUserMapper;
@@ -689,13 +689,14 @@ public class PythonGateway {
@PostConstruct
public void init() {
- if (pythonGatewayConfiguration.isEnabled()) {
+ if (apiConfig.getPythonGateway().isEnabled()) {
this.start();
}
}
private void start() {
try {
+ ApiConfig.PythonGatewayConfiguration pythonGatewayConfiguration =
apiConfig.getPythonGateway();
InetAddress gatewayHost =
InetAddress.getByName(pythonGatewayConfiguration.getGatewayServerAddress());
GatewayServerBuilder serverBuilder = new
GatewayServer.GatewayServerBuilder()
.entryPoint(this)
diff --git a/dolphinscheduler-api/src/main/resources/application.yaml
b/dolphinscheduler-api/src/main/resources/application.yaml
index 5d3154c6f5..1165437972 100644
--- a/dolphinscheduler-api/src/main/resources/application.yaml
+++ b/dolphinscheduler-api/src/main/resources/application.yaml
@@ -115,35 +115,46 @@ registry:
block-until-connected: 600ms
digest: ~
-audit:
- enabled: false
+api:
+ audit-enable: false
+ # Traffic control, if you turn on this config, the maximum number of
request/s will be limited.
+ # global max request number per second
+ # default tenant-level max request number
+ traffic-control:
+ global-switch: false
+ max-global-qps-rate: 300
+ tenant-switch: false
+ default-tenant-qps-rate: 10
+ #customize-tenant-qps-rate:
+ # eg.
+ #tenant1: 11
+ #tenant2: 20
+ python-gateway:
+ # Weather enable python gateway server or not. The default value is true.
+ enabled: true
+ # Authentication token for connection from python api to python gateway
server. Should be changed the default value
+ # when you deploy in public network.
+ auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
+ # The address of Python gateway server start. Set its value to `0.0.0.0`
if your Python API run in different
+ # between Python gateway server. It could be be specific to other address
like `127.0.0.1` or `localhost`
+ gateway-server-address: 0.0.0.0
+ # The port of Python gateway server start. Define which port you could
connect to Python gateway server from
+ # Python API side.
+ gateway-server-port: 25333
+ # The address of Python callback client.
+ python-address: 127.0.0.1
+ # The port of Python callback client.
+ python-port: 25334
+ # Close connection of socket server if no other request accept after x
milliseconds. Define value is (0 = infinite),
+ # and socket server would never close even though no requests accept
+ connect-timeout: 0
+ # Close each active connection of socket server if python program not
active after x milliseconds. Define value is
+ # (0 = infinite), and socket server would never close even though no
requests accept
+ read-timeout: 0
metrics:
enabled: true
-python-gateway:
- # Weather enable python gateway server or not. The default value is true.
- enabled: true
- # Authentication token for connection from python api to python gateway
server. Should be changed the default value
- # when you deploy in public network.
- auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
- # The address of Python gateway server start. Set its value to `0.0.0.0` if
your Python API run in different
- # between Python gateway server. It could be be specific to other address
like `127.0.0.1` or `localhost`
- gateway-server-address: 0.0.0.0
- # The port of Python gateway server start. Define which port you could
connect to Python gateway server from
- # Python API side.
- gateway-server-port: 25333
- # The address of Python callback client.
- python-address: 127.0.0.1
- # The port of Python callback client.
- python-port: 25334
- # Close connection of socket server if no other request accept after x
milliseconds. Define value is (0 = infinite),
- # and socket server would never close even though no requests accept
- connect-timeout: 0
- # Close each active connection of socket server if python program not active
after x milliseconds. Define value is
- # (0 = infinite), and socket server would never close even though no
requests accept
- read-timeout: 0
-
security:
authentication:
# Authentication types (supported types: PASSWORD,LDAP,CASDOOR_SSO)
@@ -168,21 +179,6 @@ security:
trust-store: "/ldapkeystore.jks"
trust-store-password: "password"
-# Traffic control, if you turn on this config, the maximum number of request/s
will be limited.
-# global max request number per second
-# default tenant-level max request number
-traffic:
- control:
- global-switch: false
- max-global-qps-rate: 300
- tenant-switch: false
- default-tenant-qps-rate: 10
- #customize-tenant-qps-rate:
- # eg.
- #tenant1: 11
- #tenant2: 20
-
-
# Override by profile
---
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/ApiConfigTest.java
similarity index 56%
rename from
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java
rename to
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/ApiConfigTest.java
index d730f98afe..35650fea1e 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/ApiConfigTest.java
@@ -25,33 +25,33 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-public class TrafficConfigurationTest extends AbstractControllerTest {
+public class ApiConfigTest extends AbstractControllerTest {
@Autowired
- private TrafficConfiguration trafficConfiguration;
+ private ApiConfig apiConfig;
@Test
- public void isTrafficGlobalControlSwitch() {
- Assertions.assertFalse(trafficConfiguration.isGlobalSwitch());
+ public void testIsAuditEnable() {
+ Assertions.assertTrue(apiConfig.isAuditEnable());
}
@Test
- public void getMaxGlobalQpsLimit() {
- Assertions.assertEquals(300, (int)
trafficConfiguration.getMaxGlobalQpsRate());
- }
+ public void testGetTrafficControlConfig() {
+ ApiConfig.TrafficConfiguration trafficControl =
apiConfig.getTrafficControl();
- @Test
- public void isTrafficTenantControlSwitch() {
- Assertions.assertFalse(trafficConfiguration.isTenantSwitch());
- }
+ Assertions.assertFalse(trafficControl.isGlobalSwitch());
+ Assertions.assertEquals(299, (int)
trafficControl.getMaxGlobalQpsRate());
+ Assertions.assertFalse(trafficControl.isTenantSwitch());
+ Assertions.assertEquals(9, (int)
trafficControl.getDefaultTenantQpsRate());
+
Assertions.assertTrue(MapUtils.isEmpty(trafficControl.getCustomizeTenantQpsRate()));
- @Test
- public void getDefaultTenantQpsLimit() {
- Assertions.assertEquals(10, (int)
trafficConfiguration.getDefaultTenantQpsRate());
}
@Test
- public void getCustomizeTenantQpsRate() {
-
Assertions.assertTrue(MapUtils.isEmpty(trafficConfiguration.getCustomizeTenantQpsRate()));
+ public void testGetPythonGateway() {
+ ApiConfig.PythonGatewayConfiguration pythonGateway =
apiConfig.getPythonGateway();
+
+ Assertions.assertFalse(pythonGateway.isEnabled());
}
+
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/AuditConfigurationTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/AuditConfigurationTest.java
deleted file mode 100644
index a2a3759c04..0000000000
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/AuditConfigurationTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.configuration;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
-
-@ActiveProfiles("audit")
-@SpringBootTest(classes = AuditConfiguration.class)
-public class AuditConfigurationTest {
-
- @Autowired
- private AuditConfiguration auditConfiguration;
-
- @Test
- public void isAuditGlobalControlSwitch() {
- Assertions.assertTrue(auditConfiguration.getEnabled());
- }
-}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java
index 9dcd1111e2..a584c41eaf 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.interceptor;
-import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration;
+import org.apache.dolphinscheduler.api.configuration.ApiConfig;
import java.util.HashMap;
import java.util.Map;
@@ -39,14 +39,14 @@ public class RateLimitInterceptorTest {
public void testPreHandleWithoutControl() throws ExecutionException {
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
- RateLimitInterceptor rateLimitInterceptor = new
RateLimitInterceptor(new TrafficConfiguration());
+ RateLimitInterceptor rateLimitInterceptor = new
RateLimitInterceptor(new ApiConfig.TrafficConfiguration());
Assertions.assertTrue(rateLimitInterceptor.preHandle(request,
response, null));
Assertions.assertTrue(rateLimitInterceptor.preHandle(request,
response, null));
}
@Test
public void testPreHandleWithTenantLevenControl() throws
ExecutionException {
- TrafficConfiguration trafficConfiguration = new TrafficConfiguration();
+ ApiConfig.TrafficConfiguration trafficConfiguration = new
ApiConfig.TrafficConfiguration();
trafficConfiguration.setTenantSwitch(true);
Map<String, Integer> map = new HashMap<>();
map.put("tenant1", 2);
@@ -70,7 +70,7 @@ public class RateLimitInterceptorTest {
@Test
public void testPreHandleWithGlobalControl() throws ExecutionException {
- TrafficConfiguration trafficConfiguration = new TrafficConfiguration();
+ ApiConfig.TrafficConfiguration trafficConfiguration = new
ApiConfig.TrafficConfiguration();
trafficConfiguration.setTenantSwitch(true);
trafficConfiguration.setGlobalSwitch(true);
trafficConfiguration.setMaxGlobalQpsRate(3);
diff --git a/dolphinscheduler-api/src/test/resources/application.yaml
b/dolphinscheduler-api/src/test/resources/application.yaml
index b071412a6f..fda37e4ea4 100644
--- a/dolphinscheduler-api/src/test/resources/application.yaml
+++ b/dolphinscheduler-api/src/test/resources/application.yaml
@@ -27,5 +27,39 @@ spring:
registry:
type: zookeeper
-audit:
- enabled: true
\ No newline at end of file
+api:
+ audit-enable: true
+ # Traffic control, if you turn on this config, the maximum number of
request/s will be limited.
+ # global max request number per second
+ # default tenant-level max request number
+ traffic-control:
+ global-switch: false
+ max-global-qps-rate: 299
+ tenant-switch: false
+ default-tenant-qps-rate: 9
+ #customize-tenant-qps-rate:
+ # eg.
+ #tenant1: 11
+ #tenant2: 20
+ python-gateway:
+ # Weather enable python gateway server or not. The default value is true.
+ enabled: false
+ # Authentication token for connection from python api to python gateway
server. Should be changed the default value
+ # when you deploy in public network.
+ auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
+ # The address of Python gateway server start. Set its value to `0.0.0.0`
if your Python API run in different
+ # between Python gateway server. It could be be specific to other address
like `127.0.0.1` or `localhost`
+ gateway-server-address: 0.0.0.0
+ # The port of Python gateway server start. Define which port you could
connect to Python gateway server from
+ # Python API side.
+ gateway-server-port: 25333
+ # The address of Python callback client.
+ python-address: 127.0.0.1
+ # The port of Python callback client.
+ python-port: 25334
+ # Close connection of socket server if no other request accept after x
milliseconds. Define value is (0 = infinite),
+ # and socket server would never close even though no requests accept
+ connect-timeout: 0
+ # Close each active connection of socket server if python program not
active after x milliseconds. Define value is
+ # (0 = infinite), and socket server would never close even though no
requests accept
+ read-timeout: 0
\ No newline at end of file
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index dadd02fbf0..1bbe919353 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -25,6 +25,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import
org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
@@ -73,6 +74,9 @@ public class MasterServer implements IStoppable {
@Autowired
private MasterRPCServer masterRPCServer;
+ @Autowired
+ private MasterRpcClient masterRpcClient;
+
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
@@ -85,6 +89,7 @@ public class MasterServer implements IStoppable {
public void run() throws SchedulerException {
// init rpc server
this.masterRPCServer.start();
+ this.masterRpcClient.start();
// install task plugin
this.taskPluginManager.loadPlugin();
@@ -125,6 +130,7 @@ public class MasterServer implements IStoppable {
SchedulerApi closedSchedulerApi = schedulerApi;
MasterSchedulerBootstrap closedSchedulerBootstrap =
masterSchedulerBootstrap;
MasterRPCServer closedRpcServer = masterRPCServer;
+ MasterRpcClient closedRpcClient = masterRpcClient;
MasterRegistryClient closedMasterRegistryClient =
masterRegistryClient;
// close spring Context and will invoke method with
@PreDestroy annotation to destroy beans.
// like
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index d65a3fe52a..05a79d1170 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.server.master.config;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@@ -97,6 +99,10 @@ public class MasterConfig implements Validator {
private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
+ private NettyClientConfig masterRpcClientConfig = new NettyClientConfig();
+
+ private NettyServerConfig masterRpcServerConfig = new NettyServerConfig();
+
// ip:listenPort
private String masterAddress;
@@ -177,5 +183,7 @@ public class MasterConfig implements Validator {
log.info("Master config: masterAddress -> {} ", masterAddress);
log.info("Master config: masterRegistryPath -> {} ",
masterRegistryPath);
log.info("Master config: workerGroupRefreshInterval -> {} ",
workerGroupRefreshInterval);
+ log.info("Master config: masterRpcServerConfig -> {} ",
masterRpcServerConfig);
+ log.info("Master config: masterRpcClientConfig -> {} ",
masterRpcClientConfig);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index ac9d4da19d..f5cadb7339 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -47,7 +47,7 @@ public class MasterRPCServer implements AutoCloseable {
public void start() {
log.info("Starting Master RPC Server...");
// init remoting server
- NettyServerConfig serverConfig = new NettyServerConfig();
+ NettyServerConfig serverConfig =
masterConfig.getMasterRpcServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
for (MasterRpcProcessor masterRpcProcessor : masterRpcProcessors) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java
index 225b27b060..15d976cbee 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java
@@ -19,25 +19,28 @@ package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Message;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class MasterRpcClient {
+public class MasterRpcClient implements AutoCloseable {
- private final NettyRemotingClient client;
+ @Autowired
+ private MasterConfig masterConfig;
private static final long DEFAULT_TIME_OUT_MILLS = 10_000L;
+ private NettyRemotingClient client;
- public MasterRpcClient() {
- client = new NettyRemotingClient(new NettyClientConfig());
+ public void start() {
+ client = new
NettyRemotingClient(masterConfig.getMasterRpcClientConfig());
log.info("Success initialized MasterRPCClient...");
}
@@ -46,7 +49,15 @@ public class MasterRpcClient {
return client.sendSync(host, rpcMessage, DEFAULT_TIME_OUT_MILLS);
}
- public void send(Host of, Message message) throws RemotingException {
- client.send(of, message);
+ public void send(@NonNull Host host, @NonNull Message message) throws
RemotingException {
+ client.send(host, message);
}
+
+ @Override
+ public void close() {
+ if (client != null) {
+ client.close();
+ }
+ }
+
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index e60da71472..f7051dbabc 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -47,7 +47,6 @@ import
org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -244,19 +243,10 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
private final CuringParamsService curingParamsService;
- private final String masterAddress;
-
private final DefaultTaskExecuteRunnableFactory
defaultTaskExecuteRunnableFactory;
- /**
- * @param processInstance processInstance
- * @param processService processService
- * @param processInstanceDao processInstanceDao
- * @param masterRpcClient masterRpcClient
- * @param processAlertManager processAlertManager
- * @param masterConfig masterConfig
- * @param stateWheelExecuteThread stateWheelExecuteThread
- */
+ private final MasterConfig masterConfig;
+
public WorkflowExecuteRunnable(
@NonNull ProcessInstance processInstance,
@NonNull CommandService commandService,
@@ -275,12 +265,12 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
this.processInstanceDao = processInstanceDao;
this.processInstance = processInstance;
this.masterRpcClient = masterRpcClient;
+ this.masterConfig = masterConfig;
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingParamsService = curingParamsService;
this.taskInstanceDao = taskInstanceDao;
this.taskDefinitionLogDao = taskDefinitionLogDao;
- this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.defaultTaskExecuteRunnableFactory =
defaultTaskExecuteRunnableFactory;
this.processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
@@ -480,7 +470,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
taskInstance.getState());
this.updateProcessInstanceState();
- sendTaskLogOnMasterToRemoteIfNeeded(taskInstance.getLogPath(),
taskInstance.getHost());
+ sendTaskLogOnMasterToRemoteIfNeeded(taskInstance);
} catch (Exception ex) {
log.error("Task finish failed, get a exception, will remove this
taskInstance from completeTaskSet", ex);
// remove the task from complete map, so that we can finish in the
next time.
@@ -1424,7 +1414,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
try {
Message message =
masterRpcClient.sendSyncCommand(Host.of(taskInstance.getHost()),
- new
WorkflowHostChangeRequest(taskInstance.getId(),
masterAddress).convert2Command());
+ new
WorkflowHostChangeRequest(taskInstance.getId(), masterConfig.getMasterAddress())
+ .convert2Command());
if (message == null) {
log.error(
"Takeover task instance failed, the worker {} might
not be alive, will try to create a new task instance",
@@ -2237,17 +2228,13 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatus> {
}
- private void sendTaskLogOnMasterToRemoteIfNeeded(String logPath, String
host) {
- if (RemoteLogUtils.isRemoteLoggingEnable() &&
isExecutedOnMaster(host)) {
- RemoteLogUtils.sendRemoteLog(logPath);
- log.info("Master sends task log {} to remote storage
asynchronously.", logPath);
+ private void sendTaskLogOnMasterToRemoteIfNeeded(TaskInstance
taskInstance) {
+ if (RemoteLogUtils.isRemoteLoggingEnable() &&
TaskUtils.isMasterTask(taskInstance.getTaskType())) {
+ RemoteLogUtils.sendRemoteLog(taskInstance.getLogPath());
+ log.info("Master sends task log {} to remote storage
asynchronously.", taskInstance.getLogPath());
}
}
- private boolean isExecutedOnMaster(String host) {
- return host.endsWith(masterAddress.split(Constants.COLON)[1]);
- }
-
private void mergeTaskInstanceVarPool(TaskInstance taskInstance) {
String taskVarPoolJson = taskInstance.getVarPool();
if (StringUtils.isEmpty(taskVarPoolJson)) {
diff --git
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 27522f3937..3247253511 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -181,28 +181,42 @@ alert:
wait-timeout: 0
heartbeat-interval: 60s
-python-gateway:
- # Weather enable python gateway server or not. The default value is true.
- enabled: true
- # Authentication token for connection from python api to python gateway
server. Should be changed the default value
- # when you deploy in public network.
- auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
- # The address of Python gateway server start. Set its value to `0.0.0.0` if
your Python API run in different
- # between Python gateway server. It could be be specific to other address
like `127.0.0.1` or `localhost`
- gateway-server-address: 0.0.0.0
- # The port of Python gateway server start. Define which port you could
connect to Python gateway server from
- # Python API side.
- gateway-server-port: 25333
- # The address of Python callback client.
- python-address: 127.0.0.1
- # The port of Python callback client.
- python-port: 25334
- # Close connection of socket server if no other request accept after x
milliseconds. Define value is (0 = infinite),
- # and socket server would never close even though no requests accept
- connect-timeout: 0
- # Close each active connection of socket server if python program not active
after x milliseconds. Define value is
- # (0 = infinite), and socket server would never close even though no
requests accept
- read-timeout: 0
+api:
+ audit-enable: false
+ # Traffic control, if you turn on this config, the maximum number of
request/s will be limited.
+ # global max request number per second
+ # default tenant-level max request number
+ traffic-control:
+ global-switch: false
+ max-global-qps-rate: 300
+ tenant-switch: false
+ default-tenant-qps-rate: 10
+ #customize-tenant-qps-rate:
+ # eg.
+ #tenant1: 11
+ #tenant2: 20
+ python-gateway:
+ # Weather enable python gateway server or not. The default value is true.
+ enabled: true
+ # Authentication token for connection from python api to python gateway
server. Should be changed the default value
+ # when you deploy in public network.
+ auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
+ # The address of Python gateway server start. Set its value to `0.0.0.0`
if your Python API run in different
+ # between Python gateway server. It could be be specific to other address
like `127.0.0.1` or `localhost`
+ gateway-server-address: 0.0.0.0
+ # The port of Python gateway server start. Define which port you could
connect to Python gateway server from
+ # Python API side.
+ gateway-server-port: 25333
+ # The address of Python callback client.
+ python-address: 127.0.0.1
+ # The port of Python callback client.
+ python-port: 25334
+ # Close connection of socket server if no other request accept after x
milliseconds. Define value is (0 = infinite),
+ # and socket server would never close even though no requests accept
+ connect-timeout: 0
+ # Close each active connection of socket server if python program not
active after x milliseconds. Define value is
+ # (0 = infinite), and socket server would never close even though no
requests accept
+ read-timeout: 0
server:
port: 12345
@@ -234,9 +248,6 @@ management:
tags:
application: ${spring.application.name}
-audit:
- enabled: true
-
metrics:
enabled: true
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 6d1d776bc8..ae1ce70163 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -21,6 +21,8 @@ import static
org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DO
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import java.time.Duration;
@@ -50,6 +52,9 @@ public class WorkerConfig implements Validator {
private double reservedMemory = 0.1;
private ConnectStrategyProperties registryDisconnectStrategy = new
ConnectStrategyProperties();
+ private NettyClientConfig workerRpcClientConfig = new NettyClientConfig();
+ private NettyServerConfig workerRpcServerConfig = new NettyServerConfig();
+
/**
* This field doesn't need to set at config file, it will be calculated by
workerIp:listenPort
*/
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
index adf5cbccbe..e7a025546e 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
@@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.server.worker.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Message;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.processor.WorkerRpcProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import java.util.List;
@@ -43,16 +43,17 @@ public class WorkerRpcClient implements AutoCloseable {
@Lazy
private List<WorkerRpcProcessor> workerRpcProcessors;
+ @Autowired
+ private WorkerConfig workerConfig;
+
private NettyRemotingClient nettyRemotingClient;
public void start() {
log.info("Worker rpc client starting");
- NettyClientConfig nettyClientConfig = new NettyClientConfig();
- this.nettyRemotingClient = new NettyRemotingClient(nettyClientConfig);
+ this.nettyRemotingClient = new
NettyRemotingClient(workerConfig.getWorkerRpcClientConfig());
// we only use the client to handle the ack message, we can optimize
this, send ack to the nettyServer.
for (WorkerRpcProcessor workerRpcProcessor : workerRpcProcessors) {
this.nettyRemotingClient.registerProcessor(workerRpcProcessor);
-
}
log.info("Worker rpc client started");
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index 94b509498a..cf81ad4f55 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -44,7 +44,7 @@ public class WorkerRpcServer implements Closeable {
public void start() {
log.info("Worker rpc server starting...");
- NettyServerConfig serverConfig = new NettyServerConfig();
+ NettyServerConfig serverConfig =
workerConfig.getWorkerRpcServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
nettyRemotingServer = new NettyRemotingServer(serverConfig);
for (WorkerRpcProcessor workerRpcProcessor : workerRpcProcessors) {