This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch 3.0.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 4b224ae2e513b1124a0abaa4b5cde4b964d27a28 Author: Wenjun Ruan <[email protected]> AuthorDate: Tue Jun 28 20:17:43 2022 +0800 Validate master/worker config (#10649) (cherry picked from commit 35b25da863e38df817bfe3cb2ef758d426b8b699) --- .../server/master/config/MasterConfig.java | 206 +++++++-------------- .../master/registry/MasterRegistryClient.java | 5 +- .../master/runner/FailoverExecuteThread.java | 2 +- .../master/runner/StateWheelExecuteThread.java | 2 +- .../master/runner/task/BaseTaskProcessor.java | 5 +- .../src/main/resources/application.yaml | 10 +- .../server/master/BlockingTaskTest.java | 5 +- .../server/master/ConditionsTaskTest.java | 3 +- .../server/master/DependentTaskTest.java | 3 +- .../server/master/SubProcessTaskTest.java | 3 +- .../server/master/SwitchTaskTest.java | 3 +- .../service/process/ProcessService.java | 2 +- .../service/process/ProcessServiceImpl.java | 2 +- .../src/main/resources/application.yaml | 16 +- .../server/worker/config/WorkerConfig.java | 131 ++++--------- .../worker/registry/WorkerRegistryClient.java | 2 +- .../src/main/resources/application.yaml | 4 +- .../worker/registry/WorkerRegistryClientTest.java | 13 +- 18 files changed, 156 insertions(+), 261 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 76d4ae1525..0494b7dd00 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -21,170 +21,108 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelect import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.stereotype.Component; +import java.time.Duration; -@Component -@EnableConfigurationProperties -@ConfigurationProperties("master") -public class MasterConfig { +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.validation.Errors; +import org.springframework.validation.Validator; +import org.springframework.validation.annotation.Validated; + +import lombok.Data; + +@Data +@Validated +@Configuration +@ConfigurationProperties(prefix = "master") +public class MasterConfig implements Validator { /** * The master RPC server listen port. */ - private int listenPort; + private int listenPort = 5678; /** * The max batch size used to fetch command from database. */ - private int fetchCommandNum; + private int fetchCommandNum = 10; /** * The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum. */ - private int preExecThreads; + private int preExecThreads = 10; /** * todo: We may need to split the process/task into different thread size. * The thread number used to handle processInstance and task event. * Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}. */ - private int execThreads; + private int execThreads = 10; /** * The task dispatch thread pool size. */ - private int dispatchTaskNumber; + private int dispatchTaskNumber = 3; /** * Worker select strategy. */ - private HostSelector hostSelector; + private HostSelector hostSelector = HostSelector.LOWER_WEIGHT; /** * Master heart beat task execute interval. */ - private int heartbeatInterval; + private Duration heartbeatInterval = Duration.ofSeconds(10); /** * task submit max retry times. */ - private int taskCommitRetryTimes; + private int taskCommitRetryTimes = 5; /** - * task submit retry interval/ms. + * task submit retry interval. */ - private int taskCommitInterval; + private Duration taskCommitInterval = Duration.ofSeconds(1); /** - * state wheel check interval/ms, if this value is bigger, may increase the delay of task/processInstance. + * state wheel check interval, if this value is bigger, may increase the delay of task/processInstance. */ - private int stateWheelInterval; - private double maxCpuLoadAvg; - private double reservedMemory; - private int failoverInterval; - private boolean killYarnJobWhenTaskFailover; - - public int getListenPort() { - return listenPort; - } - - public void setListenPort(int listenPort) { - this.listenPort = listenPort; - } - - public int getFetchCommandNum() { - return fetchCommandNum; - } - - public void setFetchCommandNum(int fetchCommandNum) { - this.fetchCommandNum = fetchCommandNum; - } - - public int getPreExecThreads() { - return preExecThreads; - } - - public void setPreExecThreads(int preExecThreads) { - this.preExecThreads = preExecThreads; - } - - public int getExecThreads() { - return execThreads; - } - - public void setExecThreads(int execThreads) { - this.execThreads = execThreads; - } - - public int getDispatchTaskNumber() { - return dispatchTaskNumber; - } - - public void setDispatchTaskNumber(int dispatchTaskNumber) { - this.dispatchTaskNumber = dispatchTaskNumber; - } - - public HostSelector getHostSelector() { - return hostSelector; - } - - public void setHostSelector(HostSelector hostSelector) { - this.hostSelector = hostSelector; - } - - public int getHeartbeatInterval() { - return heartbeatInterval; - } - - public void setHeartbeatInterval(int heartbeatInterval) { - this.heartbeatInterval = heartbeatInterval; - } - - public int getTaskCommitRetryTimes() { - return taskCommitRetryTimes; - } - - public void setTaskCommitRetryTimes(int taskCommitRetryTimes) { - this.taskCommitRetryTimes = taskCommitRetryTimes; - } - - public int getTaskCommitInterval() { - return taskCommitInterval; - } - - public void setTaskCommitInterval(int taskCommitInterval) { - this.taskCommitInterval = taskCommitInterval; - } - - public int getStateWheelInterval() { - return stateWheelInterval; - } - - public void setStateWheelInterval(int stateWheelInterval) { - this.stateWheelInterval = stateWheelInterval; - } - - public double getMaxCpuLoadAvg() { - return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : Runtime.getRuntime().availableProcessors() * 2; - } - - public void setMaxCpuLoadAvg(double maxCpuLoadAvg) { - this.maxCpuLoadAvg = maxCpuLoadAvg; - } - - public double getReservedMemory() { - return reservedMemory; - } - - public void setReservedMemory(double reservedMemory) { - this.reservedMemory = reservedMemory; - } - - public int getFailoverInterval() { - return failoverInterval; - } - - public void setFailoverInterval(int failoverInterval) { - this.failoverInterval = failoverInterval; - } - - public boolean isKillYarnJobWhenTaskFailover() { - return killYarnJobWhenTaskFailover; - } - - public void setKillYarnJobWhenTaskFailover(boolean killYarnJobWhenTaskFailover) { - this.killYarnJobWhenTaskFailover = killYarnJobWhenTaskFailover; + private Duration stateWheelInterval = Duration.ofMillis(5); + private double maxCpuLoadAvg = -1; + private double reservedMemory = 0.3; + private Duration failoverInterval = Duration.ofMinutes(10); + private boolean killYarnJobWhenTaskFailover = true; + + @Override + public boolean supports(Class<?> clazz) { + return MasterConfig.class.isAssignableFrom(clazz); + } + + @Override + public void validate(Object target, Errors errors) { + MasterConfig masterConfig = (MasterConfig) target; + if (masterConfig.getListenPort() <= 0) { + errors.rejectValue("listen-port", null, "is invalidated"); + } + if (masterConfig.getFetchCommandNum() <= 0) { + errors.rejectValue("fetch-command-num", null, "should be a positive value"); + } + if (masterConfig.getPreExecThreads() <= 0) { + errors.rejectValue("per-exec-threads", null, "should be a positive value"); + } + if (masterConfig.getExecThreads() <= 0) { + errors.rejectValue("exec-threads", null, "should be a positive value"); + } + if (masterConfig.getDispatchTaskNumber() <= 0) { + errors.rejectValue("dispatch-task-number", null, "should be a positive value"); + } + if (masterConfig.getHeartbeatInterval().toMillis() < 0) { + errors.rejectValue("heartbeat-interval", null, "should be a valid duration"); + } + if (masterConfig.getTaskCommitRetryTimes() <= 0) { + errors.rejectValue("task-commit-retry-times", null, "should be a positive value"); + } + if (masterConfig.getTaskCommitInterval().toMillis() <= 0) { + errors.rejectValue("task-commit-interval", null, "should be a valid duration"); + } + if (masterConfig.getStateWheelInterval().toMillis() <= 0) { + errors.rejectValue("state-wheel-interval", null, "should be a valid duration"); + } + if (masterConfig.getFailoverInterval().toMillis() <= 0) { + errors.rejectValue("failover-interval", null, "should be a valid duration"); + } + if (masterConfig.getMaxCpuLoadAvg() <= 0) { + masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 771bac4a7a..486c360117 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.lang3.StringUtils; +import java.time.Duration; import java.util.Collections; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -187,7 +188,7 @@ public class MasterRegistryClient { void registry() { logger.info("Master node : {} registering to registry center", masterAddress); String localNodePath = getCurrentNodePath(); - int masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); + Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory(), @@ -210,7 +211,7 @@ public class MasterRegistryClient { // delete dead server registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP); - this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS); + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS); logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java index d6f3937f4b..63f4215f27 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -67,7 +67,7 @@ public class FailoverExecuteThread extends BaseDaemonThread { } catch (Exception e) { logger.error("Master failover thread execute error", e); } finally { - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60); + ThreadUtils.sleep(masterConfig.getFailoverInterval().toMillis()); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 65c7db924d..8b92696723 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -93,7 +93,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { @Override public void run() { - Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); + Duration checkInterval = masterConfig.getStateWheelInterval(); while (Stopper.isRunning()) { try { checkTask4Timeout(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 12c3332961..4a91d8d7dc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -63,6 +63,7 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -107,7 +108,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected int maxRetryTimes; - protected int commitInterval; + protected long commitInterval; protected ProcessService processService; @@ -125,7 +126,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { this.taskInstance = taskInstance; this.processInstance = processInstance; this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); - this.commitInterval = masterConfig.getTaskCommitInterval(); + this.commitInterval = masterConfig.getTaskCommitInterval().toMillis(); } protected javax.sql.DataSource defaultDataSource = diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index 65163cdb3c..a277272026 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -96,19 +96,19 @@ master: dispatch-task-number: 3 # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight host-selector: lower_weight - # master heartbeat interval, the unit is second - heartbeat-interval: 10 + # master heartbeat interval + heartbeat-interval: 10s # master commit task retry times task-commit-retry-times: 5 - # master commit task interval, the unit is millisecond - task-commit-interval: 1000 + # master commit task interval + task-commit-interval: 1s state-wheel-interval: 5 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G reserved-memory: 0.3 # failover interval, the unit is minute - failover-interval: 10 + failover-interval: 10m # kill yarn jon when failover taskInstance, default true kill-yarn-job-when-task-failover: true diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java index d5a1a79cf3..64bc291152 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.time.Duration; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -77,7 +78,7 @@ public class BlockingTaskTest { config = new MasterConfig(); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); // mock process service processService = Mockito.mock(ProcessService.class); @@ -122,7 +123,7 @@ public class BlockingTaskTest { Mockito.when(processService .submitTaskWithRetry(Mockito.any(ProcessInstance.class) , Mockito.any(TaskInstance.class) - , Mockito.any(Integer.class), Mockito.any(Integer.class))) + , Mockito.any(Integer.class), Mockito.any(Long.class))) .thenReturn(taskInstance); return taskInstance; } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java index c4d90718b6..7d01c44746 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.time.Duration; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -66,7 +67,7 @@ public class ConditionsTaskTest { MasterConfig config = new MasterConfig(); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 8172fd44a0..50b03c86e2 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.time.Duration; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -85,7 +86,7 @@ public class DependentTaskTest { MasterConfig config = new MasterConfig(); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); processService = Mockito.mock(ProcessService.class); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java index 200dec0b3d..fa51a2d6be 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java @@ -44,6 +44,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.context.ApplicationContext; +import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -69,7 +70,7 @@ public class SubProcessTaskTest { MasterConfig config = new MasterConfig(); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); PowerMockito.mockStatic(Stopper.class); PowerMockito.when(Stopper.isRunning()).thenReturn(true); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java index 824726bb6f..6f94a22e2f 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -59,7 +60,7 @@ public class SwitchTaskTest { MasterConfig config = new MasterConfig(); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index d6131174b3..588fb59422 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -103,7 +103,7 @@ public interface ProcessService { void setSubProcessParam(ProcessInstance subProcessInstance); - TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval); + TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, long commitInterval); @Transactional(rollbackFor = Exception.class) TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 7021f4aeef..965ab89801 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1233,7 +1233,7 @@ public class ProcessServiceImpl implements ProcessService { * retry submit task to db */ @Override - public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) { + public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, long commitInterval) { int retryTimes = 1; TaskInstance task = null; while (retryTimes <= commitRetryTimes) { diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 7be661de4e..cf5ce5ea88 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -114,19 +114,19 @@ master: dispatch-task-number: 3 # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight host-selector: lower_weight - # master heartbeat interval, the unit is second - heartbeat-interval: 10 + # master heartbeat interval + heartbeat-interval: 10s # master commit task retry times task-commit-retry-times: 5 - # master commit task interval, the unit is millisecond - task-commit-interval: 1000 + # master commit task interval + task-commit-interval: 1s state-wheel-interval: 5 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G reserved-memory: 0.3 - # failover interval, the unit is minute - failover-interval: 10 + # failover interval + failover-interval: 10m # kill yarn jon when failover taskInstance, default true kill-yarn-job-when-task-failover: true @@ -135,8 +135,8 @@ worker: listen-port: 1234 # worker execute thread number to limit task instances in parallel exec-threads: 10 - # worker heartbeat interval, the unit is second - heartbeat-interval: 10 + # worker heartbeat interval + heartbeat-interval: 10s # worker host weight to dispatch tasks, default value 100 host-weight: 100 # worker tenant auto create diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 6edff09f97..a641b28e1b 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -17,104 +17,53 @@ package org.apache.dolphinscheduler.server.worker.config; +import java.time.Duration; import java.util.Set; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; +import org.springframework.validation.Errors; +import org.springframework.validation.Validator; +import org.springframework.validation.annotation.Validated; -@Configuration -@EnableConfigurationProperties -@ConfigurationProperties("worker") -public class WorkerConfig { - private int listenPort; - private int execThreads; - private int heartbeatInterval; - private int hostWeight; - private boolean tenantAutoCreate; - private int maxCpuLoadAvg; - private double reservedMemory; - private Set<String> groups; - private String alertListenHost; - private int alertListenPort; - - public int getListenPort() { - return listenPort; - } - - public void setListenPort(int listenPort) { - this.listenPort = listenPort; - } - - public int getExecThreads() { - return execThreads; - } - - public void setExecThreads(int execThreads) { - this.execThreads = execThreads; - } - - public int getHeartbeatInterval() { - return heartbeatInterval; - } - - public void setHeartbeatInterval(int heartbeatInterval) { - this.heartbeatInterval = heartbeatInterval; - } - - public int getHostWeight() { - return hostWeight; - } - - public void setHostWeight(int hostWeight) { - this.hostWeight = hostWeight; - } - - public boolean isTenantAutoCreate() { - return tenantAutoCreate; - } +import com.google.common.collect.Sets; - public void setTenantAutoCreate(boolean tenantAutoCreate) { - this.tenantAutoCreate = tenantAutoCreate; - } - - public int getMaxCpuLoadAvg() { - return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : Runtime.getRuntime().availableProcessors() * 2; - } - - public void setMaxCpuLoadAvg(int maxCpuLoadAvg) { - this.maxCpuLoadAvg = maxCpuLoadAvg; - } +import lombok.Data; - public double getReservedMemory() { - return reservedMemory; - } - - public void setReservedMemory(double reservedMemory) { - this.reservedMemory = reservedMemory; - } - - public Set<String> getGroups() { - return groups; - } - - public void setGroups(Set<String> groups) { - this.groups = groups; - } - - public String getAlertListenHost() { - return alertListenHost; - } - - public void setAlertListenHost(String alertListenHost) { - this.alertListenHost = alertListenHost; - } - - public int getAlertListenPort() { - return alertListenPort; - } +@Data +@Validated +@Configuration +@ConfigurationProperties(prefix = "worker") +public class WorkerConfig implements Validator { + private int listenPort = 1234; + private int execThreads = 10; + private Duration heartbeatInterval = Duration.ofSeconds(10); + private int hostWeight = 100; + private boolean tenantAutoCreate = true; + private boolean tenantDistributedUser = false; + private int maxCpuLoadAvg = -1; + private double reservedMemory = 0.3; + private Set<String> groups = Sets.newHashSet("default"); + private String alertListenHost = "localhost"; + private int alertListenPort = 50052; + + @Override + public boolean supports(Class<?> clazz) { + return WorkerConfig.class.isAssignableFrom(clazz); + } + + @Override + public void validate(Object target, Errors errors) { + WorkerConfig workerConfig = (WorkerConfig) target; + if (workerConfig.getExecThreads() <= 0) { + errors.rejectValue("exec-threads", null, "should be a positive value"); + } + if (workerConfig.getHeartbeatInterval().toMillis() <= 0) { + errors.rejectValue("heartbeat-interval", null, "shoule be a valid duration"); + } + if (workerConfig.getMaxCpuLoadAvg() <= 0) { + workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); + } - public void setAlertListenPort(final int alertListenPort) { - this.alertListenPort = alertListenPort; } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index e218b0a4aa..b33b3ef0d7 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -99,7 +99,7 @@ public class WorkerRegistryClient { public void registry() { String address = NetUtils.getAddr(workerConfig.getListenPort()); Set<String> workerZkPaths = getWorkerZkPaths(); - int workerHeartbeatInterval = workerConfig.getHeartbeatInterval(); + long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, workerConfig.getMaxCpuLoadAvg(), diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml index 397e533db9..d764f9d25b 100644 --- a/dolphinscheduler-worker/src/main/resources/application.yaml +++ b/dolphinscheduler-worker/src/main/resources/application.yaml @@ -58,8 +58,8 @@ worker: listen-port: 1234 # worker execute thread number to limit task instances in parallel exec-threads: 100 - # worker heartbeat interval, the unit is second - heartbeat-interval: 10 + # worker heartbeat interval + heartbeat-interval: 10s # worker host weight to dispatch tasks, default value 100 host-weight: 100 # worker tenant auto create diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java index 56af82d9d8..4bd2161e83 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import java.time.Duration; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -86,15 +87,15 @@ public class WorkerRegistryClientTest { @Test public void testRegistry() { workerRegistryClient.initWorkRegistry(); - + given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1); - + given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true); - - given(workerConfig.getHeartbeatInterval()).willReturn(1); - + + given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1)); + workerRegistryClient.registry(); - + Mockito.verify(registryClient, Mockito.times(1)).handleDeadServer(Mockito.anyCollection(), Mockito.any(NodeType.class), Mockito.anyString()); }
