This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 6f53df7fbd [Chore] Optimize the configuration in ServerLoadProtection
(#17315)
6f53df7fbd is described below
commit 6f53df7fbd3f751b2fa357cda6a899947e070157
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jul 7 11:45:02 2025 +0800
[Chore] Optimize the configuration in ServerLoadProtection (#17315)
---
.../server/master/config/MasterConfig.java | 4 +-
.../master/config/MasterServerLoadProtection.java | 30 ++----
.../config/MasterServerLoadProtectionConfig.java | 42 ++------
.../master/engine/command/CommandEngine.java | 6 +-
.../master/registry/MasterHeartBeatTask.java | 12 +--
.../master/registry/MasterRegistryClient.java | 8 +-
.../server/master/config/MasterConfigTest.java | 28 +-----
.../config/MasterServerLoadProtectionTest.java | 111 +++++++--------------
.../meter/metrics/BaseServerLoadProtection.java | 40 ++++----
.../metrics/BaseServerLoadProtectionConfig.java | 18 +++-
.../server/worker/config/WorkerConfig.java | 2 +-
.../worker/config/WorkerServerLoadProtection.java | 29 ++++++
....java => WorkerServerLoadProtectionConfig.java} | 11 +-
.../worker/registry/WorkerRegistryClient.java | 5 +
.../server/worker/task/WorkerHeartBeatTask.java | 16 +--
.../config/WorkerServerLoadProtectionTest.java | 8 +-
.../worker/registry/WorkerRegistryClientTest.java | 12 +--
17 files changed, 163 insertions(+), 219 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index b2a84f30d0..220062bd57 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -28,7 +28,6 @@ import java.time.Duration;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.Errors;
@@ -56,8 +55,7 @@ public class MasterConfig implements Validator {
*/
private Duration maxHeartbeatInterval = Duration.ofSeconds(10);
- @Autowired
- private MasterServerLoadProtection serverLoadProtection;
+ private MasterServerLoadProtectionConfig serverLoadProtection = new
MasterServerLoadProtectionConfig();
private Duration workerGroupRefreshInterval = Duration.ofMinutes(5);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java
index 1c47feca25..2dbb199578 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtection.java
@@ -21,49 +21,41 @@ import
org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtection;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
@Slf4j
-@Getter
+@Component
public class MasterServerLoadProtection extends BaseServerLoadProtection {
- private final int maxConcurrentWorkflowInstances;
private final IWorkflowRepository workflowRepository;
+ private final MasterServerLoadProtectionConfig
masterServerLoadProtectionConfig;
+
public MasterServerLoadProtection(IWorkflowRepository workflowRepository,
- int maxConcurrentWorkflowInstances,
- double
maxSystemCpuUsagePercentageThresholds,
- double
maxJvmCpuUsagePercentageThresholds,
- double
maxSystemMemoryUsagePercentageThresholds,
- double maxDiskUsagePercentageThresholds,
- boolean enabled) {
+ MasterConfig masterConfig) {
+ super(masterConfig.getServerLoadProtection());
+ this.masterServerLoadProtectionConfig =
masterConfig.getServerLoadProtection();
this.workflowRepository = workflowRepository;
- this.maxConcurrentWorkflowInstances = maxConcurrentWorkflowInstances;
- this.maxSystemCpuUsagePercentageThresholds =
maxSystemCpuUsagePercentageThresholds;
- this.maxJvmCpuUsagePercentageThresholds =
maxJvmCpuUsagePercentageThresholds;
- this.maxSystemMemoryUsagePercentageThresholds =
maxSystemMemoryUsagePercentageThresholds;
- this.maxDiskUsagePercentageThresholds =
maxDiskUsagePercentageThresholds;
- this.enabled = enabled;
}
@Override
public boolean isOverload(SystemMetrics systemMetrics) {
- if (!enabled) {
+ if (!masterServerLoadProtectionConfig.isEnabled()) {
return false;
}
- // Check system metrics first
if (super.isOverload(systemMetrics)) {
return true;
}
// Check workflow instance count
int currentWorkflowInstanceCount = workflowRepository.getAll().size();
- if (currentWorkflowInstanceCount >= maxConcurrentWorkflowInstances) {
+ if (currentWorkflowInstanceCount >=
masterServerLoadProtectionConfig.getMaxConcurrentWorkflowInstances()) {
log.info(
"OverLoad: the workflow instance count: {} exceeds the
maxConcurrentWorkflowInstances {}",
- currentWorkflowInstanceCount,
maxConcurrentWorkflowInstances);
+ currentWorkflowInstanceCount,
masterServerLoadProtectionConfig.getMaxConcurrentWorkflowInstances());
return true;
}
return false;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java
index cd2420ccbc..c1cda12ce0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java
@@ -17,43 +17,15 @@
package org.apache.dolphinscheduler.server.master.config;
-import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
+import
org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtectionConfig;
-import lombok.extern.slf4j.Slf4j;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class MasterServerLoadProtectionConfig extends
BaseServerLoadProtectionConfig {
-@Slf4j
-@Configuration
-public class MasterServerLoadProtectionConfig {
+ private int maxConcurrentWorkflowInstances = Integer.MAX_VALUE;
- @Bean
- public MasterServerLoadProtection masterServerLoadProtection(
-
IWorkflowRepository workflowRepository,
-
@Value("${master.server-load-protection.max-concurrent-workflow-instances:2147483647}")
int maxConcurrentWorkflowInstances,
-
@Value("${master.server-load-protection.max-system-cpu-usage-percentage-thresholds:0.7}")
double maxSystemCpuUsagePercentageThresholds,
-
@Value("${master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds:0.7}")
double maxJvmCpuUsagePercentageThresholds,
-
@Value("${master.server-load-protection.max-system-memory-usage-percentage-thresholds:0.7}")
double maxSystemMemoryUsagePercentageThresholds,
-
@Value("${master.server-load-protection.max-disk-usage-percentage-thresholds:0.7}")
double maxDiskUsagePercentageThresholds,
-
@Value("${master.server-load-protection.enabled:true}") boolean enabled) {
- MasterServerLoadProtection protection =
- new MasterServerLoadProtection(workflowRepository,
- maxConcurrentWorkflowInstances,
- maxSystemCpuUsagePercentageThresholds,
- maxJvmCpuUsagePercentageThresholds,
- maxSystemMemoryUsagePercentageThresholds,
- maxDiskUsagePercentageThresholds,
- enabled);
- log.info(
- "Initialized MasterServerLoadProtection with
IWorkflowRepository and maxConcurrentWorkflowInstances={}, "
- +
- "maxSystemCpuUsagePercentageThresholds={},
maxJvmCpuUsagePercentageThresholds={}, " +
- "maxSystemMemoryUsagePercentageThresholds={},
maxDiskUsagePercentageThresholds={}, enabled={}",
- maxConcurrentWorkflowInstances,
maxSystemCpuUsagePercentageThresholds,
- maxJvmCpuUsagePercentageThresholds,
- maxSystemMemoryUsagePercentageThresholds,
maxDiskUsagePercentageThresholds, enabled);
- return protection;
- }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
index 0f31a6971f..be681eb1b1 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java
@@ -68,6 +68,9 @@ public class CommandEngine extends BaseDaemonThread
implements AutoCloseable {
@Autowired
private MasterConfig masterConfig;
+ @Autowired
+ private MasterServerLoadProtection masterServerLoadProtection;
+
@Autowired
private IWorkflowRepository workflowRepository;
@@ -107,12 +110,11 @@ public class CommandEngine extends BaseDaemonThread
implements AutoCloseable {
@Override
public void run() {
- MasterServerLoadProtection serverLoadProtection =
masterConfig.getServerLoadProtection();
while (flag) {
try {
// todo: if the workflow event queue is much, we need to
handle the back pressure
SystemMetrics systemMetrics =
metricsProvider.getSystemMetrics();
- if (serverLoadProtection.isOverload(systemMetrics)) {
+ if (masterServerLoadProtection.isOverload(systemMetrics)) {
log.warn("The current server is overload, cannot consumes
commands.");
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index 3282248401..fe4b626094 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -41,6 +41,8 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
private final MasterConfig masterConfig;
+ private MasterServerLoadProtection masterServerLoadProtection;
+
private final MetricsProvider metricsProvider;
private final RegistryClient registryClient;
@@ -52,11 +54,13 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
private final int processId;
public MasterHeartBeatTask(@NonNull MasterConfig masterConfig,
+ @NonNull MasterServerLoadProtection
masterServerLoadProtection,
@NonNull MetricsProvider metricsProvider,
@NonNull RegistryClient registryClient,
@NonNull MasterCoordinator masterCoordinator) {
super("MasterHeartBeatTask",
masterConfig.getMaxHeartbeatInterval().toMillis());
this.masterConfig = masterConfig;
+ this.masterServerLoadProtection = masterServerLoadProtection;
this.metricsProvider = metricsProvider;
this.registryClient = registryClient;
this.masterCoordinator = masterCoordinator;
@@ -67,7 +71,6 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
@Override
public MasterHeartBeat getHeartBeat() {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
- ServerStatus serverStatus = getServerStatus(systemMetrics,
masterConfig.getServerLoadProtection());
return MasterHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime())
.reportTime(System.currentTimeMillis())
@@ -81,7 +84,8 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
.memoryUsage(systemMetrics.getSystemMemoryUsedPercentage())
.diskUsage(systemMetrics.getDiskUsedPercentage())
.processId(processId)
- .serverStatus(serverStatus)
+ .serverStatus(
+ masterServerLoadProtection.isOverload(systemMetrics) ?
ServerStatus.BUSY : ServerStatus.NORMAL)
.host(NetUtils.getHost())
.port(masterConfig.getListenPort())
.isCoordinator(masterCoordinator.isActive())
@@ -108,8 +112,4 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
masterHeartBeatJson);
}
- private ServerStatus getServerStatus(final SystemMetrics systemMetrics,
- final MasterServerLoadProtection
masterServerLoadProtection) {
- return masterServerLoadProtection.isOverload(systemMetrics) ?
ServerStatus.BUSY : ServerStatus.NORMAL;
- }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index f09fc51683..fff19b28a0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -28,6 +28,7 @@ import
org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import
org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
import lombok.extern.slf4j.Slf4j;
@@ -49,6 +50,9 @@ public class MasterRegistryClient implements AutoCloseable {
@Autowired
private MasterConfig masterConfig;
+ @Autowired
+ private MasterServerLoadProtection masterServerLoadProtection;
+
@Autowired
private MetricsProvider metricsProvider;
@@ -59,8 +63,8 @@ public class MasterRegistryClient implements AutoCloseable {
public void start() {
try {
- this.masterHeartBeatTask =
- new MasterHeartBeatTask(masterConfig, metricsProvider,
registryClient, masterCoordinator);
+ this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig,
masterServerLoadProtection,
+ metricsProvider, registryClient, masterCoordinator);
// master registry
registry();
registryClient.addConnectionStateListener(new
MasterConnectionStateListener(registryClient));
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
index 569dbac5ec..991d6c249e 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
@@ -20,49 +20,25 @@ package org.apache.dolphinscheduler.server.master.config;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
import
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerConfigurationProperties;
import
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerType;
-import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.TestPropertySource;
@AutoConfigureMockMvc
-@SpringBootTest(classes = {
- MasterConfig.class,
- MasterServerLoadProtectionConfig.class,
- MasterConfigTest.TestBeans.class
-})
-@TestPropertySource(properties = {
-
"master.server-load-protection.max-system-cpu-usage-percentage-thresholds=0.9",
-
"master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds=0.9",
-
"master.server-load-protection.max-system-memory-usage-percentage-thresholds=0.9",
-
"master.server-load-protection.max-disk-usage-percentage-thresholds=0.9"
-})
+@SpringBootTest(classes = MasterConfig.class)
public class MasterConfigTest {
@Autowired
private MasterConfig masterConfig;
- @TestConfiguration
- static class TestBeans {
-
- @Bean
- public IWorkflowRepository workflowRepository() {
- return mock(IWorkflowRepository.class);
- }
- }
-
@Test
public void getServerLoadProtection() {
- MasterServerLoadProtection serverLoadProtection =
masterConfig.getServerLoadProtection();
+ MasterServerLoadProtectionConfig serverLoadProtection =
masterConfig.getServerLoadProtection();
assertTrue(serverLoadProtection.isEnabled());
assertEquals(0.9,
serverLoadProtection.getMaxSystemCpuUsagePercentageThresholds());
assertEquals(0.9,
serverLoadProtection.getMaxJvmCpuUsagePercentageThresholds());
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionTest.java
index 45b75ae7df..40cfe67c47 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionTest.java
@@ -17,14 +17,15 @@
package org.apache.dolphinscheduler.server.master.config;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
-import java.util.Collection;
import java.util.Collections;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -33,10 +34,6 @@ class MasterServerLoadProtectionTest {
private IWorkflowRepository mockRepository;
- private static final double DEFAULT_THRESHOLD = 0.7;
- private static final double LOW_THRESHOLD = 0.5;
- private static final boolean isEnabled = true;
-
@BeforeEach
public void setUp() {
mockRepository = Mockito.mock(IWorkflowRepository.class);
@@ -45,9 +42,9 @@ class MasterServerLoadProtectionTest {
@Test
void isOverload() {
+ final MasterConfig masterConfig = new MasterConfig();
MasterServerLoadProtection masterServerLoadProtection =
- new MasterServerLoadProtection(mockRepository,
Integer.MAX_VALUE, DEFAULT_THRESHOLD, DEFAULT_THRESHOLD,
- DEFAULT_THRESHOLD, DEFAULT_THRESHOLD, isEnabled);
+ new MasterServerLoadProtection(mockRepository, masterConfig);
SystemMetrics systemMetrics = SystemMetrics.builder()
.jvmMemoryUsedPercentage(0.71)
@@ -57,11 +54,11 @@ class MasterServerLoadProtectionTest {
.diskUsedPercentage(0.71)
.build();
- masterServerLoadProtection.setEnabled(false);
-
Assertions.assertFalse(masterServerLoadProtection.isOverload(systemMetrics));
+ masterConfig.getServerLoadProtection().setEnabled(false);
+ assertFalse(masterServerLoadProtection.isOverload(systemMetrics));
- masterServerLoadProtection.setEnabled(true);
-
Assertions.assertTrue(masterServerLoadProtection.isOverload(systemMetrics));
+ masterConfig.getServerLoadProtection().setEnabled(true);
+ assertTrue(masterServerLoadProtection.isOverload(systemMetrics));
}
@Test
@@ -70,8 +67,9 @@ class MasterServerLoadProtectionTest {
* Set custom thresholds higher than the metrics values.
* With higher thresholds, the system should not be overloaded
*/
+ final MasterConfig masterConfig = new MasterConfig();
MasterServerLoadProtection masterServerLoadProtection =
- new MasterServerLoadProtection(mockRepository,
Integer.MAX_VALUE, 0.8, 0.8, 0.8, 0.8, true);
+ new MasterServerLoadProtection(mockRepository, masterConfig);
SystemMetrics systemMetrics = SystemMetrics.builder()
.jvmMemoryUsedPercentage(0.71)
@@ -81,51 +79,45 @@ class MasterServerLoadProtectionTest {
.diskUsedPercentage(0.71)
.build();
-
Assertions.assertFalse(masterServerLoadProtection.isOverload(systemMetrics));
+
masterConfig.getServerLoadProtection().setMaxJvmCpuUsagePercentageThresholds(0.8);
+
masterConfig.getServerLoadProtection().setMaxSystemCpuUsagePercentageThresholds(0.8);
+
masterConfig.getServerLoadProtection().setMaxSystemMemoryUsagePercentageThresholds(0.8);
+
masterConfig.getServerLoadProtection().setMaxDiskUsagePercentageThresholds(0.8);
+ assertFalse(masterServerLoadProtection.isOverload(systemMetrics));
/**
* Now set custom thresholds lower than the metrics values.
* With a lower system CPU, Memory & Disk threshold, the system should
be overloaded.
*/
- MasterServerLoadProtection masterServerLoadProtection2 =
- new MasterServerLoadProtection(mockRepository,
Integer.MAX_VALUE, 0.6, 0.8, 0.8, 0.8, true);
-
-
Assertions.assertTrue(masterServerLoadProtection2.isOverload(systemMetrics));
-
- MasterServerLoadProtection masterServerLoadProtection3 =
- new MasterServerLoadProtection(mockRepository,
Integer.MAX_VALUE, 0.8, 0.6, 0.8, 0.8, true);
-
-
Assertions.assertTrue(masterServerLoadProtection3.isOverload(systemMetrics));
-
- MasterServerLoadProtection masterServerLoadProtection4 =
- new MasterServerLoadProtection(mockRepository,
Integer.MAX_VALUE, 0.8, 0.8, 0.6, 0.8, true);
+
masterConfig.getServerLoadProtection().setMaxJvmCpuUsagePercentageThresholds(0.7);
+ assertTrue(masterServerLoadProtection.isOverload(systemMetrics));
+
masterConfig.getServerLoadProtection().setMaxJvmCpuUsagePercentageThresholds(0.8);
-
Assertions.assertTrue(masterServerLoadProtection4.isOverload(systemMetrics));
+
masterConfig.getServerLoadProtection().setMaxSystemCpuUsagePercentageThresholds(0.7);
+ assertTrue(masterServerLoadProtection.isOverload(systemMetrics));
+
masterConfig.getServerLoadProtection().setMaxSystemCpuUsagePercentageThresholds(0.8);
- MasterServerLoadProtection masterServerLoadProtection5 =
- new MasterServerLoadProtection(mockRepository,
Integer.MAX_VALUE, 0.8, 0.8, 0.8, 0.6, true);
+
masterConfig.getServerLoadProtection().setMaxSystemMemoryUsagePercentageThresholds(0.7);
+ assertTrue(masterServerLoadProtection.isOverload(systemMetrics));
+
masterConfig.getServerLoadProtection().setMaxSystemMemoryUsagePercentageThresholds(0.8);
-
Assertions.assertTrue(masterServerLoadProtection5.isOverload(systemMetrics));
-
- MasterServerLoadProtection masterServerLoadProtection6 =
- new MasterServerLoadProtection(mockRepository,
Integer.MAX_VALUE, 0.6, 0.6, 0.6, 0.6, true);
-
-
Assertions.assertTrue(masterServerLoadProtection6.isOverload(systemMetrics));
+
masterConfig.getServerLoadProtection().setMaxDiskUsagePercentageThresholds(0.7);
+ assertTrue(masterServerLoadProtection.isOverload(systemMetrics));
+
masterConfig.getServerLoadProtection().setMaxDiskUsagePercentageThresholds(0.8);
}
@Test
void isOverloadWithMaxConcurrentWorkflowInstances() {
- Collection<IWorkflowExecutionRunnable> mockWorkflows =
- Collections.nCopies(5,
Mockito.mock(IWorkflowExecutionRunnable.class));
- Mockito.when(mockRepository.getAll()).thenReturn(mockWorkflows);
+ Mockito.when(mockRepository.getAll())
+ .thenReturn(Collections.nCopies(5,
Mockito.mock(IWorkflowExecutionRunnable.class)));
// With a workflow count below the threshold, the system should not be
overloaded.
+ MasterConfig masterConfig = new MasterConfig();
MasterServerLoadProtection masterServerLoadProtection =
- new MasterServerLoadProtection(mockRepository, 10, 0.7, 0.7,
0.7, 0.7, true);
+ new MasterServerLoadProtection(mockRepository, masterConfig);
- masterServerLoadProtection.setEnabled(true);
-
Assertions.assertFalse(masterServerLoadProtection.isOverload(SystemMetrics.builder()
+
assertFalse(masterServerLoadProtection.isOverload(SystemMetrics.builder()
.jvmMemoryUsedPercentage(0.5)
.systemMemoryUsedPercentage(0.5)
.systemCpuUsagePercentage(0.5)
@@ -134,21 +126,8 @@ class MasterServerLoadProtectionTest {
.build()));
// With a workflow count anything >= maxConcurrentWorkflowInstances
threshold, the system should be overloaded.
- MasterServerLoadProtection masterServerLoadProtection2 =
- new MasterServerLoadProtection(mockRepository, 5, 0.7, 0.7,
0.7, 0.7, true);
- masterServerLoadProtection2.setEnabled(true);
-
Assertions.assertTrue(masterServerLoadProtection2.isOverload(SystemMetrics.builder()
- .jvmMemoryUsedPercentage(0.5)
- .systemMemoryUsedPercentage(0.5)
- .systemCpuUsagePercentage(0.5)
- .jvmCpuUsagePercentage(0.5)
- .diskUsedPercentage(0.5)
- .build()));
-
- MasterServerLoadProtection masterServerLoadProtection3 =
- new MasterServerLoadProtection(mockRepository, 3, 0.7, 0.7,
0.7, 0.7, true);
- masterServerLoadProtection3.setEnabled(true);
-
Assertions.assertTrue(masterServerLoadProtection3.isOverload(SystemMetrics.builder()
+
masterConfig.getServerLoadProtection().setMaxConcurrentWorkflowInstances(5);
+
assertTrue(masterServerLoadProtection.isOverload(SystemMetrics.builder()
.jvmMemoryUsedPercentage(0.5)
.systemMemoryUsedPercentage(0.5)
.systemCpuUsagePercentage(0.5)
@@ -157,24 +136,4 @@ class MasterServerLoadProtectionTest {
.build()));
}
- @Test
- void isNotOverloadWhenAllMetricsAreFine() {
- Collection<IWorkflowExecutionRunnable> mockWorkflows =
Collections.nCopies(5,
- Mockito.mock(IWorkflowExecutionRunnable.class));
- Mockito.when(mockRepository.getAll()).thenReturn(mockWorkflows);
-
- MasterServerLoadProtection masterServerLoadProtection =
- new MasterServerLoadProtection(mockRepository, 10,
DEFAULT_THRESHOLD,
- DEFAULT_THRESHOLD, DEFAULT_THRESHOLD,
DEFAULT_THRESHOLD, isEnabled);
- masterServerLoadProtection.setEnabled(true);
-
- SystemMetrics lowUsageMetrics = SystemMetrics.builder()
- .systemCpuUsagePercentage(LOW_THRESHOLD)
- .jvmCpuUsagePercentage(LOW_THRESHOLD)
- .systemMemoryUsedPercentage(LOW_THRESHOLD)
- .diskUsedPercentage(LOW_THRESHOLD)
- .build();
-
-
Assertions.assertFalse(masterServerLoadProtection.isOverload(lowUsageMetrics));
- }
}
diff --git
a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/BaseServerLoadProtection.java
b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/BaseServerLoadProtection.java
index fd12d3bb66..7d5ae76a40 100644
---
a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/BaseServerLoadProtection.java
+++
b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/BaseServerLoadProtection.java
@@ -17,49 +17,51 @@
package org.apache.dolphinscheduler.meter.metrics;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-@Data
public class BaseServerLoadProtection implements ServerLoadProtection {
- protected boolean enabled = true;
+ protected final BaseServerLoadProtectionConfig
baseServerLoadProtectionConfig;
- protected double maxSystemCpuUsagePercentageThresholds = 0.7;
-
- protected double maxJvmCpuUsagePercentageThresholds = 0.7;
-
- protected double maxSystemMemoryUsagePercentageThresholds = 0.7;
-
- protected double maxDiskUsagePercentageThresholds = 0.7;
+ public BaseServerLoadProtection(BaseServerLoadProtectionConfig
baseServerLoadProtectionConfig) {
+ this.baseServerLoadProtectionConfig = baseServerLoadProtectionConfig;
+ }
@Override
public boolean isOverload(SystemMetrics systemMetrics) {
- if (!enabled) {
+ if (!baseServerLoadProtectionConfig.isEnabled()) {
return false;
}
- if (systemMetrics.getSystemCpuUsagePercentage() >
maxSystemCpuUsagePercentageThresholds) {
+ if (systemMetrics.getSystemCpuUsagePercentage() >
baseServerLoadProtectionConfig
+ .getMaxSystemCpuUsagePercentageThresholds()) {
log.info(
"OverLoad: the system cpu usage: {} is over then the
maxSystemCpuUsagePercentageThresholds {}",
- systemMetrics.getSystemCpuUsagePercentage(),
maxSystemCpuUsagePercentageThresholds);
+ systemMetrics.getSystemCpuUsagePercentage(),
+
baseServerLoadProtectionConfig.getMaxSystemCpuUsagePercentageThresholds());
return true;
}
- if (systemMetrics.getJvmCpuUsagePercentage() >
maxJvmCpuUsagePercentageThresholds) {
+ if (systemMetrics.getJvmCpuUsagePercentage() >
baseServerLoadProtectionConfig
+ .getMaxJvmCpuUsagePercentageThresholds()) {
log.info(
"OverLoad: the jvm cpu usage: {} is over then the
maxJvmCpuUsagePercentageThresholds {}",
- systemMetrics.getJvmCpuUsagePercentage(),
maxJvmCpuUsagePercentageThresholds);
+ systemMetrics.getJvmCpuUsagePercentage(),
+
baseServerLoadProtectionConfig.getMaxJvmCpuUsagePercentageThresholds());
return true;
}
- if (systemMetrics.getDiskUsedPercentage() >
maxDiskUsagePercentageThresholds) {
+ if (systemMetrics.getDiskUsedPercentage() >
baseServerLoadProtectionConfig
+ .getMaxDiskUsagePercentageThresholds()) {
log.info("OverLoad: the DiskUsedPercentage: {} is over then the
maxDiskUsagePercentageThresholds {}",
- systemMetrics.getDiskUsedPercentage(),
maxDiskUsagePercentageThresholds);
+ systemMetrics.getDiskUsedPercentage(),
+
baseServerLoadProtectionConfig.getMaxDiskUsagePercentageThresholds());
return true;
}
- if (systemMetrics.getSystemMemoryUsedPercentage() >
maxSystemMemoryUsagePercentageThresholds) {
+ if (systemMetrics.getSystemMemoryUsedPercentage() >
baseServerLoadProtectionConfig
+ .getMaxSystemMemoryUsagePercentageThresholds()) {
log.info(
"OverLoad: the SystemMemoryUsedPercentage: {} is over then
the maxSystemMemoryUsagePercentageThresholds {}",
- systemMetrics.getSystemMemoryUsedPercentage(),
maxSystemMemoryUsagePercentageThresholds);
+ systemMetrics.getSystemMemoryUsedPercentage(),
+
baseServerLoadProtectionConfig.getMaxSystemMemoryUsagePercentageThresholds());
return true;
}
return false;
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/BaseServerLoadProtectionConfig.java
similarity index 65%
copy from
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
copy to
dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/BaseServerLoadProtectionConfig.java
index 1a52100eb2..f10b63cd18 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
+++
b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/BaseServerLoadProtectionConfig.java
@@ -15,13 +15,21 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.worker.config;
+package org.apache.dolphinscheduler.meter.metrics;
-import org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtection;
+import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
+@Data
+public abstract class BaseServerLoadProtectionConfig {
-@Slf4j
-public class WorkerServerLoadProtection extends BaseServerLoadProtection {
+ protected boolean enabled = true;
+
+ protected double maxSystemCpuUsagePercentageThresholds = 0.7;
+
+ protected double maxJvmCpuUsagePercentageThresholds = 0.7;
+
+ protected double maxSystemMemoryUsagePercentageThresholds = 0.7;
+
+ protected double maxDiskUsagePercentageThresholds = 0.7;
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 5f3d3cddf6..317b399256 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -43,7 +43,7 @@ public class WorkerConfig implements Validator {
private int listenPort = 1234;
private Duration maxHeartbeatInterval = Duration.ofSeconds(10);
private int hostWeight = 100;
- private WorkerServerLoadProtection serverLoadProtection = new
WorkerServerLoadProtection();
+ private WorkerServerLoadProtectionConfig serverLoadProtection = new
WorkerServerLoadProtectionConfig();
private String group;
/**
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
index 1a52100eb2..e21e18a5af 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
@@ -18,10 +18,39 @@
package org.apache.dolphinscheduler.server.worker.config;
import org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtection;
+import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
+import
org.apache.dolphinscheduler.server.worker.executor.PhysicalTaskExecutorContainerProvider;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
@Slf4j
+@Component
public class WorkerServerLoadProtection extends BaseServerLoadProtection {
+ @Autowired
+ private PhysicalTaskExecutorContainerProvider
physicalTaskExecutorContainerDelegator;
+
+ public WorkerServerLoadProtection(WorkerConfig workerConfig) {
+ super(workerConfig.getServerLoadProtection());
+ }
+
+ @Override
+ public boolean isOverload(SystemMetrics systemMetrics) {
+ if (!baseServerLoadProtectionConfig.isEnabled()) {
+ return false;
+ }
+
+ if (super.isOverload(systemMetrics)) {
+ return true;
+ }
+
+ if
(physicalTaskExecutorContainerDelegator.getExecutorContainer().slotUsage() ==
1) {
+ log.info("OverLoad: the TaskExecutorContainer slot usage is 1");
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionConfig.java
similarity index 82%
copy from
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
copy to
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionConfig.java
index 1a52100eb2..b024ac15cf 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtection.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionConfig.java
@@ -17,11 +17,12 @@
package org.apache.dolphinscheduler.server.worker.config;
-import org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtection;
+import
org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtectionConfig;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class WorkerServerLoadProtection extends BaseServerLoadProtection {
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class WorkerServerLoadProtectionConfig extends
BaseServerLoadProtectionConfig {
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 225cef6fa7..a4b4841212 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -29,6 +29,7 @@ import
org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import
org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection;
import
org.apache.dolphinscheduler.server.worker.executor.PhysicalTaskExecutorContainerProvider;
import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
@@ -52,6 +53,9 @@ public class WorkerRegistryClient implements AutoCloseable {
@Autowired
private WorkerConfig workerConfig;
+ @Autowired
+ private WorkerServerLoadProtection workerServerLoadProtection;
+
@Autowired
private PhysicalTaskExecutorContainerProvider
physicalTaskExecutorContainerDelegator;
@@ -67,6 +71,7 @@ public class WorkerRegistryClient implements AutoCloseable {
public void initWorkRegistry() {
this.workerHeartBeatTask = new WorkerHeartBeatTask(
workerConfig,
+ workerServerLoadProtection,
metricsProvider,
registryClient,
physicalTaskExecutorContainerDelegator.getExecutorContainer());
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 2e157f6fd9..47d0f9c7dd 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
private final WorkerConfig workerConfig;
+ private final WorkerServerLoadProtection workerServerLoadProtection;
private final RegistryClient registryClient;
private final MetricsProvider metricsProvider;
@@ -49,10 +50,12 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
private final ITaskExecutorContainer taskExecutorContainer;
public WorkerHeartBeatTask(@NonNull WorkerConfig workerConfig,
+ @NonNull WorkerServerLoadProtection
workerServerLoadProtection,
@NonNull MetricsProvider metricsProvider,
@NonNull RegistryClient registryClient,
@NonNull ITaskExecutorContainer
taskExecutorContainer) {
super("WorkerHeartBeatTask",
workerConfig.getMaxHeartbeatInterval().toMillis());
+ this.workerServerLoadProtection = workerServerLoadProtection;
this.metricsProvider = metricsProvider;
this.workerConfig = workerConfig;
this.registryClient = registryClient;
@@ -63,7 +66,6 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
@Override
public WorkerHeartBeat getHeartBeat() {
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
- ServerStatus serverStatus = getServerStatus(systemMetrics,
workerConfig, taskExecutorContainer);
return WorkerHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime())
@@ -80,7 +82,8 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
.processId(processId)
.workerHostWeight(workerConfig.getHostWeight())
.threadPoolUsage(taskExecutorContainer.slotUsage())
- .serverStatus(serverStatus)
+ .serverStatus(
+ workerServerLoadProtection.isOverload(systemMetrics) ?
ServerStatus.BUSY : ServerStatus.NORMAL)
.host(NetUtils.getHost())
.port(workerConfig.getListenPort())
.workerGroup(workerConfig.getGroup())
@@ -109,13 +112,4 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
workerHeartBeatJson);
}
- private ServerStatus getServerStatus(SystemMetrics systemMetrics,
- WorkerConfig workerConfig,
- ITaskExecutorContainer
taskExecutorContainer) {
- if (taskExecutorContainer.slotUsage() == 1) {
- return ServerStatus.BUSY;
- }
- WorkerServerLoadProtection serverLoadProtection =
workerConfig.getServerLoadProtection();
- return serverLoadProtection.isOverload(systemMetrics) ?
ServerStatus.BUSY : ServerStatus.NORMAL;
- }
}
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionTest.java
index 204deb120e..e393cfa9c3 100644
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionTest.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/WorkerServerLoadProtectionTest.java
@@ -26,7 +26,8 @@ class WorkerServerLoadProtectionTest {
@Test
void isOverload() {
- WorkerServerLoadProtection workerServerLoadProtection = new
WorkerServerLoadProtection();
+ WorkerConfig workerConfig = new WorkerConfig();
+ WorkerServerLoadProtection workerServerLoadProtection = new
WorkerServerLoadProtection(workerConfig);
SystemMetrics systemMetrics = SystemMetrics.builder()
.jvmMemoryUsedPercentage(0.71)
.systemMemoryUsedPercentage(0.71)
@@ -34,10 +35,11 @@ class WorkerServerLoadProtectionTest {
.jvmCpuUsagePercentage(0.71)
.diskUsedPercentage(0.71)
.build();
- workerServerLoadProtection.setEnabled(false);
+
+ workerConfig.getServerLoadProtection().setEnabled(false);
Assertions.assertFalse(workerServerLoadProtection.isOverload(systemMetrics));
- workerServerLoadProtection.setEnabled(true);
+ workerConfig.getServerLoadProtection().setEnabled(true);
Assertions.assertTrue(workerServerLoadProtection.isOverload(systemMetrics));
}
}
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index 636ba2e461..739b751973 100644
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -34,7 +34,6 @@ import
org.apache.dolphinscheduler.task.executor.container.TaskExecutorContainer
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Optional;
import org.junit.jupiter.api.Assertions;
@@ -45,6 +44,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import com.google.common.collect.Lists;
+
/**
* worker registry test
*/
@@ -56,6 +57,8 @@ public class WorkerRegistryClientTest {
@Mock
private RegistryClient registryClient;
@Mock
+ private WorkerServerLoadProtection workerServerLoadProtection;
+ @Mock
private WorkerConfig workerConfig;
@Mock
private MetricsProvider metricsProvider;
@@ -77,7 +80,6 @@ public class WorkerRegistryClientTest {
given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
given(workerConfig.getMaxHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
- given(workerConfig.getServerLoadProtection()).willReturn(new
WorkerServerLoadProtection());
given(metricsProvider.getSystemMetrics()).willReturn(new
SystemMetrics());
given(registryClient.checkNodeExists(Mockito.anyString(),
Mockito.any(RegistryNodeType.class)))
.willReturn(true);
@@ -90,8 +92,7 @@ public class WorkerRegistryClientTest {
@Test
public void testWorkerRegistryClientgetAlertServerAddress() {
-
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
- .willReturn(new ArrayList<Server>());
+
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class))).willReturn(new
ArrayList<>());
Assertions.assertEquals(workerRegistryClient.getAlertServerAddress(),
Optional.empty());
Mockito.reset(registryClient);
String host = "test";
@@ -99,8 +100,7 @@ public class WorkerRegistryClientTest {
Server server = new Server();
server.setHost(host);
server.setPort(port);
-
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
- .willReturn(new ArrayList<Server>(Arrays.asList(server)));
+
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class))).willReturn(Lists.newArrayList(server));
Assertions.assertEquals(workerRegistryClient.getAlertServerAddress().get().getAddress(),
String.format("%s:%d", host, port));
}