This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 4b224ae2e513b1124a0abaa4b5cde4b964d27a28
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Jun 28 20:17:43 2022 +0800

    Validate master/worker config (#10649)
    
    (cherry picked from commit 35b25da863e38df817bfe3cb2ef758d426b8b699)
---
 .../server/master/config/MasterConfig.java         | 206 +++++++--------------
 .../master/registry/MasterRegistryClient.java      |   5 +-
 .../master/runner/FailoverExecuteThread.java       |   2 +-
 .../master/runner/StateWheelExecuteThread.java     |   2 +-
 .../master/runner/task/BaseTaskProcessor.java      |   5 +-
 .../src/main/resources/application.yaml            |  10 +-
 .../server/master/BlockingTaskTest.java            |   5 +-
 .../server/master/ConditionsTaskTest.java          |   3 +-
 .../server/master/DependentTaskTest.java           |   3 +-
 .../server/master/SubProcessTaskTest.java          |   3 +-
 .../server/master/SwitchTaskTest.java              |   3 +-
 .../service/process/ProcessService.java            |   2 +-
 .../service/process/ProcessServiceImpl.java        |   2 +-
 .../src/main/resources/application.yaml            |  16 +-
 .../server/worker/config/WorkerConfig.java         | 131 ++++---------
 .../worker/registry/WorkerRegistryClient.java      |   2 +-
 .../src/main/resources/application.yaml            |   4 +-
 .../worker/registry/WorkerRegistryClientTest.java  |  13 +-
 18 files changed, 156 insertions(+), 261 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 76d4ae1525..0494b7dd00 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -21,170 +21,108 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelect
 import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import 
org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.stereotype.Component;
+import java.time.Duration;
 
-@Component
-@EnableConfigurationProperties
-@ConfigurationProperties("master")
-public class MasterConfig {
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.validation.Errors;
+import org.springframework.validation.Validator;
+import org.springframework.validation.annotation.Validated;
+
+import lombok.Data;
+
+@Data
+@Validated
+@Configuration
+@ConfigurationProperties(prefix = "master")
+public class MasterConfig implements Validator {
     /**
      * The master RPC server listen port.
      */
-    private int listenPort;
+    private int listenPort = 5678;
     /**
      * The max batch size used to fetch command from database.
      */
-    private int fetchCommandNum;
+    private int fetchCommandNum = 10;
     /**
      * The thread number used to prepare processInstance. This number 
shouldn't bigger than fetchCommandNum.
      */
-    private int preExecThreads;
+    private int preExecThreads = 10;
     /**
      * todo: We may need to split the process/task into different thread size.
      * The thread number used to handle processInstance and task event.
      * Will create two thread poll to execute {@link WorkflowExecuteRunnable} 
and {@link TaskExecuteRunnable}.
      */
-    private int execThreads;
+    private int execThreads = 10;
     /**
      * The task dispatch thread pool size.
      */
-    private int dispatchTaskNumber;
+    private int dispatchTaskNumber = 3;
     /**
      * Worker select strategy.
      */
-    private HostSelector hostSelector;
+    private HostSelector hostSelector = HostSelector.LOWER_WEIGHT;
     /**
      * Master heart beat task execute interval.
      */
-    private int heartbeatInterval;
+    private Duration heartbeatInterval = Duration.ofSeconds(10);
     /**
      * task submit max retry times.
      */
-    private int taskCommitRetryTimes;
+    private int taskCommitRetryTimes = 5;
     /**
-     * task submit retry interval/ms.
+     * task submit retry interval.
      */
-    private int taskCommitInterval;
+    private Duration taskCommitInterval = Duration.ofSeconds(1);
     /**
-     * state wheel check interval/ms, if this value is bigger, may increase 
the delay of task/processInstance.
+     * state wheel check interval, if this value is bigger, may increase the 
delay of task/processInstance.
      */
-    private int stateWheelInterval;
-    private double maxCpuLoadAvg;
-    private double reservedMemory;
-    private int failoverInterval;
-    private boolean killYarnJobWhenTaskFailover;
-
-    public int getListenPort() {
-        return listenPort;
-    }
-
-    public void setListenPort(int listenPort) {
-        this.listenPort = listenPort;
-    }
-
-    public int getFetchCommandNum() {
-        return fetchCommandNum;
-    }
-
-    public void setFetchCommandNum(int fetchCommandNum) {
-        this.fetchCommandNum = fetchCommandNum;
-    }
-
-    public int getPreExecThreads() {
-        return preExecThreads;
-    }
-
-    public void setPreExecThreads(int preExecThreads) {
-        this.preExecThreads = preExecThreads;
-    }
-
-    public int getExecThreads() {
-        return execThreads;
-    }
-
-    public void setExecThreads(int execThreads) {
-        this.execThreads = execThreads;
-    }
-
-    public int getDispatchTaskNumber() {
-        return dispatchTaskNumber;
-    }
-
-    public void setDispatchTaskNumber(int dispatchTaskNumber) {
-        this.dispatchTaskNumber = dispatchTaskNumber;
-    }
-
-    public HostSelector getHostSelector() {
-        return hostSelector;
-    }
-
-    public void setHostSelector(HostSelector hostSelector) {
-        this.hostSelector = hostSelector;
-    }
-
-    public int getHeartbeatInterval() {
-        return heartbeatInterval;
-    }
-
-    public void setHeartbeatInterval(int heartbeatInterval) {
-        this.heartbeatInterval = heartbeatInterval;
-    }
-
-    public int getTaskCommitRetryTimes() {
-        return taskCommitRetryTimes;
-    }
-
-    public void setTaskCommitRetryTimes(int taskCommitRetryTimes) {
-        this.taskCommitRetryTimes = taskCommitRetryTimes;
-    }
-
-    public int getTaskCommitInterval() {
-        return taskCommitInterval;
-    }
-
-    public void setTaskCommitInterval(int taskCommitInterval) {
-        this.taskCommitInterval = taskCommitInterval;
-    }
-
-    public int getStateWheelInterval() {
-        return stateWheelInterval;
-    }
-
-    public void setStateWheelInterval(int stateWheelInterval) {
-        this.stateWheelInterval = stateWheelInterval;
-    }
-
-    public double getMaxCpuLoadAvg() {
-        return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : 
Runtime.getRuntime().availableProcessors() * 2;
-    }
-
-    public void setMaxCpuLoadAvg(double maxCpuLoadAvg) {
-        this.maxCpuLoadAvg = maxCpuLoadAvg;
-    }
-
-    public double getReservedMemory() {
-        return reservedMemory;
-    }
-
-    public void setReservedMemory(double reservedMemory) {
-        this.reservedMemory = reservedMemory;
-    }
-
-    public int getFailoverInterval() {
-        return failoverInterval;
-    }
-
-    public void setFailoverInterval(int failoverInterval) {
-        this.failoverInterval = failoverInterval;
-    }
-
-    public boolean isKillYarnJobWhenTaskFailover() {
-        return killYarnJobWhenTaskFailover;
-    }
-
-    public void setKillYarnJobWhenTaskFailover(boolean 
killYarnJobWhenTaskFailover) {
-        this.killYarnJobWhenTaskFailover = killYarnJobWhenTaskFailover;
+    private Duration stateWheelInterval = Duration.ofMillis(5);
+    private double maxCpuLoadAvg = -1;
+    private double reservedMemory = 0.3;
+    private Duration failoverInterval = Duration.ofMinutes(10);
+    private boolean killYarnJobWhenTaskFailover = true;
+
+    @Override
+    public boolean supports(Class<?> clazz) {
+        return MasterConfig.class.isAssignableFrom(clazz);
+    }
+
+    @Override
+    public void validate(Object target, Errors errors) {
+        MasterConfig masterConfig = (MasterConfig) target;
+        if (masterConfig.getListenPort() <= 0) {
+            errors.rejectValue("listen-port", null, "is invalidated");
+        }
+        if (masterConfig.getFetchCommandNum() <= 0) {
+            errors.rejectValue("fetch-command-num", null, "should be a 
positive value");
+        }
+        if (masterConfig.getPreExecThreads() <= 0) {
+            errors.rejectValue("per-exec-threads", null, "should be a positive 
value");
+        }
+        if (masterConfig.getExecThreads() <= 0) {
+            errors.rejectValue("exec-threads", null, "should be a positive 
value");
+        }
+        if (masterConfig.getDispatchTaskNumber() <= 0) {
+            errors.rejectValue("dispatch-task-number", null, "should be a 
positive value");
+        }
+        if (masterConfig.getHeartbeatInterval().toMillis() < 0) {
+            errors.rejectValue("heartbeat-interval", null, "should be a valid 
duration");
+        }
+        if (masterConfig.getTaskCommitRetryTimes() <= 0) {
+            errors.rejectValue("task-commit-retry-times", null, "should be a 
positive value");
+        }
+        if (masterConfig.getTaskCommitInterval().toMillis() <= 0) {
+            errors.rejectValue("task-commit-interval", null, "should be a 
valid duration");
+        }
+        if (masterConfig.getStateWheelInterval().toMillis() <= 0) {
+            errors.rejectValue("state-wheel-interval", null, "should be a 
valid duration");
+        }
+        if (masterConfig.getFailoverInterval().toMillis() <= 0) {
+            errors.rejectValue("failover-interval", null, "should be a valid 
duration");
+        }
+        if (masterConfig.getMaxCpuLoadAvg() <= 0) {
+            
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
+        }
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 771bac4a7a..486c360117 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -35,6 +35,7 @@ import 
org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -187,7 +188,7 @@ public class MasterRegistryClient {
     void registry() {
         logger.info("Master node : {} registering to registry center", 
masterAddress);
         String localNodePath = getCurrentNodePath();
-        int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
+        Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
                                                         
masterConfig.getMaxCpuLoadAvg(),
                                                         
masterConfig.getReservedMemory(),
@@ -210,7 +211,7 @@ public class MasterRegistryClient {
         // delete dead server
         registryClient.handleDeadServer(Collections.singleton(localNodePath), 
NodeType.MASTER, Constants.DELETE_OP);
 
-        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, 
masterHeartbeatInterval, TimeUnit.SECONDS);
+        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, 
masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
         logger.info("Master node : {} registered to registry center 
successfully with heartBeatInterval : {}s", masterAddress, 
masterHeartbeatInterval);
 
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index d6f3937f4b..63f4215f27 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -67,7 +67,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
             } catch (Exception e) {
                 logger.error("Master failover thread execute error", e);
             } finally {
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 
masterConfig.getFailoverInterval() * 60);
+                
ThreadUtils.sleep(masterConfig.getFailoverInterval().toMillis());
             }
         }
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 65c7db924d..8b92696723 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -93,7 +93,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread 
{
 
     @Override
     public void run() {
-        Duration checkInterval = 
Duration.ofMillis(masterConfig.getStateWheelInterval() * 
Constants.SLEEP_TIME_MILLIS);
+        Duration checkInterval = masterConfig.getStateWheelInterval();
         while (Stopper.isRunning()) {
             try {
                 checkTask4Timeout();
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 12c3332961..4a91d8d7dc 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -63,6 +63,7 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -107,7 +108,7 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
 
     protected int maxRetryTimes;
 
-    protected int commitInterval;
+    protected long commitInterval;
 
     protected ProcessService processService;
 
@@ -125,7 +126,7 @@ public abstract class BaseTaskProcessor implements 
ITaskProcessor {
         this.taskInstance = taskInstance;
         this.processInstance = processInstance;
         this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
-        this.commitInterval = masterConfig.getTaskCommitInterval();
+        this.commitInterval = masterConfig.getTaskCommitInterval().toMillis();
     }
 
     protected javax.sql.DataSource defaultDataSource =
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml 
b/dolphinscheduler-master/src/main/resources/application.yaml
index 65163cdb3c..a277272026 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -96,19 +96,19 @@ master:
   dispatch-task-number: 3
   # master host selector to select a suitable worker, default value: 
LowerWeight. Optional values include random, round_robin, lower_weight
   host-selector: lower_weight
-  # master heartbeat interval, the unit is second
-  heartbeat-interval: 10
+  # master heartbeat interval
+  heartbeat-interval: 10s
   # master commit task retry times
   task-commit-retry-times: 5
-  # master commit task interval, the unit is millisecond
-  task-commit-interval: 1000
+  # master commit task interval
+  task-commit-interval: 1s
   state-wheel-interval: 5
   # master max cpuload avg, only higher than the system cpu load average, 
master server can schedule. default value -1: the number of cpu cores * 2
   max-cpu-load-avg: -1
   # master reserved memory, only lower than system available memory, master 
server can schedule. default value 0.3, the unit is G
   reserved-memory: 0.3
   # failover interval, the unit is minute
-  failover-interval: 10
+  failover-interval: 10m
   # kill yarn jon when failover taskInstance, default true
   kill-yarn-job-when-task-failover: true
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
index d5a1a79cf3..64bc291152 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
@@ -38,6 +38,7 @@ import 
org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -77,7 +78,7 @@ public class BlockingTaskTest {
         config = new MasterConfig();
         
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
         config.setTaskCommitRetryTimes(3);
-        config.setTaskCommitInterval(1000);
+        config.setTaskCommitInterval(Duration.ofSeconds(1));
 
         // mock process service
         processService = Mockito.mock(ProcessService.class);
@@ -122,7 +123,7 @@ public class BlockingTaskTest {
         Mockito.when(processService
             .submitTaskWithRetry(Mockito.any(ProcessInstance.class)
                 , Mockito.any(TaskInstance.class)
-                , Mockito.any(Integer.class), Mockito.any(Integer.class)))
+                , Mockito.any(Integer.class), Mockito.any(Long.class)))
             .thenReturn(taskInstance);
         return taskInstance;
     }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
index c4d90718b6..7d01c44746 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
@@ -34,6 +34,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -66,7 +67,7 @@ public class ConditionsTaskTest {
         MasterConfig config = new MasterConfig();
         
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
         config.setTaskCommitRetryTimes(3);
-        config.setTaskCommitInterval(1000);
+        config.setTaskCommitInterval(Duration.ofSeconds(1));
 
         processService = Mockito.mock(ProcessService.class);
         
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index 8172fd44a0..50b03c86e2 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -35,6 +35,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.time.Duration;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -85,7 +86,7 @@ public class DependentTaskTest {
 
         MasterConfig config = new MasterConfig();
         config.setTaskCommitRetryTimes(3);
-        config.setTaskCommitInterval(1000);
+        config.setTaskCommitInterval(Duration.ofSeconds(1));
         
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
 
         processService = Mockito.mock(ProcessService.class);
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 200dec0b3d..fa51a2d6be 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -44,6 +44,7 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.springframework.context.ApplicationContext;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -69,7 +70,7 @@ public class SubProcessTaskTest {
         MasterConfig config = new MasterConfig();
         
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
         config.setTaskCommitRetryTimes(3);
-        config.setTaskCommitInterval(1000);
+        config.setTaskCommitInterval(Duration.ofSeconds(1));
 
         PowerMockito.mockStatic(Stopper.class);
         PowerMockito.when(Stopper.isRunning()).thenReturn(true);
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
index 824726bb6f..6f94a22e2f 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
@@ -31,6 +31,7 @@ import 
org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -59,7 +60,7 @@ public class SwitchTaskTest {
         MasterConfig config = new MasterConfig();
         
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
         config.setTaskCommitRetryTimes(3);
-        config.setTaskCommitInterval(1000);
+        config.setTaskCommitInterval(Duration.ofSeconds(1));
 
         processService = Mockito.mock(ProcessService.class);
         
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index d6131174b3..588fb59422 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -103,7 +103,7 @@ public interface ProcessService {
 
     void setSubProcessParam(ProcessInstance subProcessInstance);
 
-    TaskInstance submitTaskWithRetry(ProcessInstance processInstance, 
TaskInstance taskInstance, int commitRetryTimes, int commitInterval);
+    TaskInstance submitTaskWithRetry(ProcessInstance processInstance, 
TaskInstance taskInstance, int commitRetryTimes, long commitInterval);
 
     @Transactional(rollbackFor = Exception.class)
     TaskInstance submitTask(ProcessInstance processInstance, TaskInstance 
taskInstance);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 7021f4aeef..965ab89801 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1233,7 +1233,7 @@ public class ProcessServiceImpl implements ProcessService 
{
      * retry submit task to db
      */
     @Override
-    public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, 
TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
+    public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, 
TaskInstance taskInstance, int commitRetryTimes, long commitInterval) {
         int retryTimes = 1;
         TaskInstance task = null;
         while (retryTimes <= commitRetryTimes) {
diff --git 
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml 
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 7be661de4e..cf5ce5ea88 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -114,19 +114,19 @@ master:
   dispatch-task-number: 3
   # master host selector to select a suitable worker, default value: 
LowerWeight. Optional values include random, round_robin, lower_weight
   host-selector: lower_weight
-  # master heartbeat interval, the unit is second
-  heartbeat-interval: 10
+  # master heartbeat interval
+  heartbeat-interval: 10s
   # master commit task retry times
   task-commit-retry-times: 5
-  # master commit task interval, the unit is millisecond
-  task-commit-interval: 1000
+  # master commit task interval
+  task-commit-interval: 1s
   state-wheel-interval: 5
   # master max cpuload avg, only higher than the system cpu load average, 
master server can schedule. default value -1: the number of cpu cores * 2
   max-cpu-load-avg: -1
   # master reserved memory, only lower than system available memory, master 
server can schedule. default value 0.3, the unit is G
   reserved-memory: 0.3
-  # failover interval, the unit is minute
-  failover-interval: 10
+  # failover interval
+  failover-interval: 10m
   # kill yarn jon when failover taskInstance, default true
   kill-yarn-job-when-task-failover: true
 
@@ -135,8 +135,8 @@ worker:
   listen-port: 1234
   # worker execute thread number to limit task instances in parallel
   exec-threads: 10
-  # worker heartbeat interval, the unit is second
-  heartbeat-interval: 10
+  # worker heartbeat interval
+  heartbeat-interval: 10s
   # worker host weight to dispatch tasks, default value 100
   host-weight: 100
   # worker tenant auto create
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 6edff09f97..a641b28e1b 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -17,104 +17,53 @@
 
 package org.apache.dolphinscheduler.server.worker.config;
 
+import java.time.Duration;
 import java.util.Set;
 
 import org.springframework.boot.context.properties.ConfigurationProperties;
-import 
org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.validation.Errors;
+import org.springframework.validation.Validator;
+import org.springframework.validation.annotation.Validated;
 
-@Configuration
-@EnableConfigurationProperties
-@ConfigurationProperties("worker")
-public class WorkerConfig {
-    private int listenPort;
-    private int execThreads;
-    private int heartbeatInterval;
-    private int hostWeight;
-    private boolean tenantAutoCreate;
-    private int maxCpuLoadAvg;
-    private double reservedMemory;
-    private Set<String> groups;
-    private String alertListenHost;
-    private int alertListenPort;
-
-    public int getListenPort() {
-        return listenPort;
-    }
-
-    public void setListenPort(int listenPort) {
-        this.listenPort = listenPort;
-    }
-
-    public int getExecThreads() {
-        return execThreads;
-    }
-
-    public void setExecThreads(int execThreads) {
-        this.execThreads = execThreads;
-    }
-
-    public int getHeartbeatInterval() {
-        return heartbeatInterval;
-    }
-
-    public void setHeartbeatInterval(int heartbeatInterval) {
-        this.heartbeatInterval = heartbeatInterval;
-    }
-
-    public int getHostWeight() {
-        return hostWeight;
-    }
-
-    public void setHostWeight(int hostWeight) {
-        this.hostWeight = hostWeight;
-    }
-
-    public boolean isTenantAutoCreate() {
-        return tenantAutoCreate;
-    }
+import com.google.common.collect.Sets;
 
-    public void setTenantAutoCreate(boolean tenantAutoCreate) {
-        this.tenantAutoCreate = tenantAutoCreate;
-    }
-
-    public int getMaxCpuLoadAvg() {
-        return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : 
Runtime.getRuntime().availableProcessors() * 2;
-    }
-
-    public void setMaxCpuLoadAvg(int maxCpuLoadAvg) {
-        this.maxCpuLoadAvg = maxCpuLoadAvg;
-    }
+import lombok.Data;
 
-    public double getReservedMemory() {
-        return reservedMemory;
-    }
-
-    public void setReservedMemory(double reservedMemory) {
-        this.reservedMemory = reservedMemory;
-    }
-
-    public Set<String> getGroups() {
-        return groups;
-    }
-
-    public void setGroups(Set<String> groups) {
-        this.groups = groups;
-    }
-
-    public String getAlertListenHost() {
-        return alertListenHost;
-    }
-
-    public void setAlertListenHost(String alertListenHost) {
-        this.alertListenHost = alertListenHost;
-    }
-
-    public int getAlertListenPort() {
-        return alertListenPort;
-    }
+@Data
+@Validated
+@Configuration
+@ConfigurationProperties(prefix = "worker")
+public class WorkerConfig implements Validator {
+    private int listenPort = 1234;
+    private int execThreads = 10;
+    private Duration heartbeatInterval = Duration.ofSeconds(10);
+    private int hostWeight = 100;
+    private boolean tenantAutoCreate = true;
+    private boolean tenantDistributedUser = false;
+    private int maxCpuLoadAvg = -1;
+    private double reservedMemory = 0.3;
+    private Set<String> groups = Sets.newHashSet("default");
+    private String alertListenHost = "localhost";
+    private int alertListenPort = 50052;
+
+    @Override
+    public boolean supports(Class<?> clazz) {
+        return WorkerConfig.class.isAssignableFrom(clazz);
+    }
+
+    @Override
+    public void validate(Object target, Errors errors) {
+        WorkerConfig workerConfig = (WorkerConfig) target;
+        if (workerConfig.getExecThreads() <= 0) {
+            errors.rejectValue("exec-threads", null, "should be a positive 
value");
+        }
+        if (workerConfig.getHeartbeatInterval().toMillis() <= 0) {
+            errors.rejectValue("heartbeat-interval", null, "shoule be a valid 
duration");
+        }
+        if (workerConfig.getMaxCpuLoadAvg() <= 0) {
+            
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
+        }
 
-    public void setAlertListenPort(final int alertListenPort) {
-        this.alertListenPort = alertListenPort;
     }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index e218b0a4aa..b33b3ef0d7 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -99,7 +99,7 @@ public class WorkerRegistryClient {
     public void registry() {
         String address = NetUtils.getAddr(workerConfig.getListenPort());
         Set<String> workerZkPaths = getWorkerZkPaths();
-        int workerHeartbeatInterval = workerConfig.getHeartbeatInterval();
+        long workerHeartbeatInterval = 
workerConfig.getHeartbeatInterval().getSeconds();
 
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
                 workerConfig.getMaxCpuLoadAvg(),
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml 
b/dolphinscheduler-worker/src/main/resources/application.yaml
index 397e533db9..d764f9d25b 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -58,8 +58,8 @@ worker:
   listen-port: 1234
   # worker execute thread number to limit task instances in parallel
   exec-threads: 100
-  # worker heartbeat interval, the unit is second
-  heartbeat-interval: 10
+  # worker heartbeat interval
+  heartbeat-interval: 10s
   # worker host weight to dispatch tasks, default value 100
   host-weight: 100
   # worker tenant auto create
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index 56af82d9d8..4bd2161e83 100644
--- 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -24,6 +24,7 @@ import 
org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
+import java.time.Duration;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -86,15 +87,15 @@ public class WorkerRegistryClientTest {
     @Test
     public void testRegistry() {
         workerRegistryClient.initWorkRegistry();
-    
+
         given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1);
-    
+
         given(registryClient.checkNodeExists(Mockito.anyString(), 
Mockito.any(NodeType.class))).willReturn(true);
-    
-        given(workerConfig.getHeartbeatInterval()).willReturn(1);
-    
+
+        
given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
+
         workerRegistryClient.registry();
-    
+
         Mockito.verify(registryClient, 
Mockito.times(1)).handleDeadServer(Mockito.anyCollection(), 
Mockito.any(NodeType.class), Mockito.anyString());
     }
 

Reply via email to