This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.1-prepare by this push:
     new 0c3ab4335e cherry-pick Add worker-group-refresh-interval in master 
config #12601
0c3ab4335e is described below

commit 0c3ab4335e70a7e9482e2dee245522ebda0b7b37
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Oct 31 09:37:26 2022 +0800

    cherry-pick Add worker-group-refresh-interval in master config #12601
---
 docs/docs/en/architecture/configuration.md         |   1 +
 docs/docs/zh/architecture/configuration.md         |   1 +
 .../server/master/config/MasterConfig.java         |   7 ++
 .../master/dispatch/host/CommonHostManager.java    |   8 +-
 .../server/master/registry/ServerNodeManager.java  | 108 ++++++---------------
 .../master/runner/MasterSchedulerBootstrap.java    |  11 ++-
 .../src/main/resources/application.yaml            |   1 +
 .../dispatch/host/RoundRobinHostManagerTest.java   |   6 +-
 .../src/main/resources/application.yaml            |   1 +
 9 files changed, 56 insertions(+), 88 deletions(-)

diff --git a/docs/docs/en/architecture/configuration.md 
b/docs/docs/en/architecture/configuration.md
index 661481767d..d174d6c9ed 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -277,6 +277,7 @@ Location: `master-server/conf/application.yaml`
 |master.kill-yarn-job-when-task-failover|true|whether to kill yarn job when 
failover taskInstance|
 |master.registry-disconnect-strategy.strategy|stop|Used when the master 
disconnect from registry, default value: stop. Optional values include stop, 
waiting|
 |master.registry-disconnect-strategy.max-waiting-time|100s|Used when the 
master disconnect from registry, and the disconnect strategy is waiting, this 
config means the master will waiting to reconnect to registry in given times, 
and after the waiting times, if the master still cannot connect to registry, 
will stop itself, if the value is 0s, the Master will waitting infinitely|
+|master.worker-group-refresh-interval|10s|The interval to refresh worker group 
from db to memory|
 
 ### Worker Server related configuration
 
diff --git a/docs/docs/zh/architecture/configuration.md 
b/docs/docs/zh/architecture/configuration.md
index cf57b32791..68397c7584 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -272,6 +272,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
 |master.kill-yarn-job-when-task-failover|true|当任务实例failover时,是否kill掉yarn job|
 |master.registry-disconnect-strategy.strategy|stop|当Master与注册中心失联之后采取的策略, 
默认值是: stop. 可选值包括: stop, waiting|
 
|master.registry-disconnect-strategy.max-waiting-time|100s|当Master与注册中心失联之后重连时间,
 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 
在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
+|master.master.worker-group-refresh-interval|10s|定期将workerGroup从数据库中同步到内存的时间间隔|
 
 ## Worker Server相关配置
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 7242f3915b..8569d822ae 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -91,6 +91,8 @@ public class MasterConfig implements Validator {
     private boolean killYarnJobWhenTaskFailover = true;
     private ConnectStrategyProperties registryDisconnectStrategy = new 
ConnectStrategyProperties();
 
+    private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
+
     // ip:listenPort
     private String masterAddress;
 
@@ -138,6 +140,10 @@ public class MasterConfig implements Validator {
         if (masterConfig.getMaxCpuLoadAvg() <= 0) {
             
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
+        if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) {
+            errors.rejectValue("worker-group-refresh-interval", null, "should 
>= 10s");
+        }
+
         
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
         masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + 
"/" + masterConfig.getMasterAddress());
         printConfig();
@@ -161,5 +167,6 @@ public class MasterConfig implements Validator {
         logger.info("Master config: registryDisconnectStrategy -> {} ", 
registryDisconnectStrategy);
         logger.info("Master config: masterAddress -> {} ", masterAddress);
         logger.info("Master config: masterRegistryPath -> {} ", 
masterRegistryPath);
+        logger.info("Master config: workerGroupRefreshInterval -> {} ", 
workerGroupRefreshInterval);
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index bbf84a3d74..42bcd56835 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@@ -26,7 +24,6 @@ import 
org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.ArrayList;
@@ -79,8 +76,9 @@ public abstract class CommonHostManager implements 
HostManager {
         Set<String> nodes = serverNodeManager.getWorkerGroupNodes(workerGroup);
         if (CollectionUtils.isNotEmpty(nodes)) {
             for (String node : nodes) {
-                WorkerHeartBeat workerNodeInfo = 
serverNodeManager.getWorkerNodeInfo(node);
-                hostWorkers.add(HostWorker.of(node, 
workerNodeInfo.getWorkerHostWeight(), workerGroup));
+                serverNodeManager.getWorkerNodeInfo(node).ifPresent(
+                        workerNodeInfo -> hostWorkers
+                                .add(HostWorker.of(node, 
workerNodeInfo.getWorkerHostWeight(), workerGroup)));
             }
         }
         return hostWorkers;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index e97738cbdf..e0ffc34373 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -48,6 +48,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -66,9 +67,6 @@ import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-/**
- * server node manager
- */
 @Service
 public class ServerNodeManager implements InitializingBean {
 
@@ -89,9 +87,6 @@ public class ServerNodeManager implements InitializingBean {
      */
     private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = 
new ConcurrentHashMap<>();
 
-    /**
-     * master nodes
-     */
     private final Set<String> masterNodes = new HashSet<>();
 
     private final Map<String, WorkerHeartBeat> workerNodeInfo = new 
HashMap<>();
@@ -115,35 +110,36 @@ public class ServerNodeManager implements 
InitializingBean {
     @Autowired
     private MasterConfig masterConfig;
 
-    private List<WorkerInfoChangeListener> workerInfoChangeListeners = new 
ArrayList<>();
+    private final List<WorkerInfoChangeListener> workerInfoChangeListeners = 
new ArrayList<>();
 
-    private static volatile int MASTER_SLOT = 0;
+    private volatile int currentSlot = 0;
 
-    private static volatile int MASTER_SIZE = 0;
+    private volatile int totalSlot = 0;
 
-    public static int getSlot() {
-        return MASTER_SLOT;
+    public int getSlot() {
+        return currentSlot;
     }
 
-    public static int getMasterSize() {
-        return MASTER_SIZE;
+    public int getMasterSize() {
+        return totalSlot;
     }
 
-    /**
-     * init listener
-     *
-     * @throws Exception if error throws Exception
-     */
     @Override
-    public void afterPropertiesSet() throws Exception {
+    public void afterPropertiesSet() {
 
         // load nodes from zookeeper
-        load();
+        updateMasterNodes();
+        updateWorkerNodes();
+        updateWorkerGroupMappings();
 
         // init executor service
         executorService =
                 Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("ServerNodeManagerExecutor"));
-        executorService.scheduleWithFixedDelay(new 
WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
+        executorService.scheduleWithFixedDelay(
+                new WorkerNodeInfoAndGroupDbSyncTask(),
+                0,
+                masterConfig.getWorkerGroupRefreshInterval().getSeconds(),
+                TimeUnit.SECONDS);
 
         // init MasterNodeListener listener
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new 
MasterDataListener());
@@ -152,19 +148,6 @@ public class ServerNodeManager implements InitializingBean 
{
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new 
WorkerDataListener());
     }
 
-    /**
-     * load nodes from zookeeper
-     */
-    public void load() {
-        // master nodes from zookeeper
-        updateMasterNodes();
-        updateWorkerNodes();
-        updateWorkerGroupMappings();
-    }
-
-    /**
-     * worker node info and worker group db sync task
-     */
     class WorkerNodeInfoAndGroupDbSyncTask implements Runnable {
 
         @Override
@@ -251,8 +234,8 @@ public class ServerNodeManager implements InitializingBean {
     }
 
     private void updateMasterNodes() {
-        MASTER_SLOT = 0;
-        MASTER_SIZE = 0;
+        currentSlot = 0;
+        totalSlot = 0;
         this.masterNodes.clear();
         String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
         try {
@@ -325,14 +308,12 @@ public class ServerNodeManager implements 
InitializingBean {
             this.masterPriorityQueue.putList(masterNodes);
             int index = 
masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
             if (index >= 0) {
-                MASTER_SIZE = nodes.size();
-                MASTER_SLOT = index;
+                totalSlot = nodes.size();
+                currentSlot = index;
             } else {
-                logger.warn("current addr:{} is not in active master list",
-                        masterConfig.getMasterAddress());
+                logger.warn("Current master is not in active master list");
             }
-            logger.info("update master nodes, master size: {}, slot: {}, addr: 
{}", MASTER_SIZE,
-                    MASTER_SLOT, masterConfig.getMasterAddress());
+            logger.info("Update master nodes, total master size: {}, current 
slot: {}", totalSlot, currentSlot);
         } finally {
             masterLock.unlock();
         }
@@ -360,10 +341,10 @@ public class ServerNodeManager implements 
InitializingBean {
                 workerGroup = Constants.DEFAULT_WORKER_GROUP;
             }
             Set<String> nodes = workerGroupNodes.get(workerGroup);
-            if (CollectionUtils.isNotEmpty(nodes)) {
-                return Collections.unmodifiableSet(nodes);
+            if (CollectionUtils.isEmpty(nodes)) {
+                return Collections.emptySet();
             }
-            return nodes;
+            return Collections.unmodifiableSet(nodes);
         } finally {
             workerGroupReadLock.unlock();
         }
@@ -373,45 +354,19 @@ public class ServerNodeManager implements 
InitializingBean {
         return Collections.unmodifiableMap(workerNodeInfo);
     }
 
-    /**
-     * get worker node info
-     *
-     * @param workerNode worker node
-     * @return worker node info
-     */
-    public WorkerHeartBeat getWorkerNodeInfo(String workerNode) {
+    public Optional<WorkerHeartBeat> getWorkerNodeInfo(String 
workerServerAddress) {
         workerNodeInfoReadLock.lock();
         try {
-            return workerNodeInfo.getOrDefault(workerNode, null);
+            return 
Optional.ofNullable(workerNodeInfo.getOrDefault(workerServerAddress, null));
         } finally {
             workerNodeInfoReadLock.unlock();
         }
     }
 
-    /**
-     * sync worker node info
-     *
-     * @param newWorkerNodeInfo new worker node info
-     */
-    private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
-        workerNodeInfoWriteLock.lock();
-        try {
-            workerNodeInfo.clear();
-            for (Map.Entry<String, String> entry : 
newWorkerNodeInfo.entrySet()) {
-                workerNodeInfo.put(entry.getKey(), 
JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
-            }
-        } finally {
-            workerNodeInfoWriteLock.unlock();
-        }
-    }
-
-    /**
-     * sync single worker node info
-     */
-    private void syncSingleWorkerNodeInfo(String node, WorkerHeartBeat info) {
+    private void syncSingleWorkerNodeInfo(String workerAddress, 
WorkerHeartBeat info) {
         workerNodeInfoWriteLock.lock();
         try {
-            workerNodeInfo.put(node, info);
+            workerNodeInfo.put(workerAddress, info);
         } finally {
             workerNodeInfoWriteLock.unlock();
         }
@@ -434,9 +389,6 @@ public class ServerNodeManager implements InitializingBean {
         }
     }
 
-    /**
-     * destroy
-     */
     @PreDestroy
     public void destroy() {
         executorService.shutdownNow();
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 08b7e4cc23..52281106ce 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -97,6 +97,9 @@ public class MasterSchedulerBootstrap extends 
BaseDaemonThread implements AutoCl
     @Autowired
     private WorkflowEventLooper workflowEventLooper;
 
+    @Autowired
+    private ServerNodeManager serverNodeManager;
+
     private String masterAddress;
 
     protected MasterSchedulerBootstrap() {
@@ -243,8 +246,8 @@ public class MasterSchedulerBootstrap extends 
BaseDaemonThread implements AutoCl
     private List<Command> findCommands() throws MasterException {
         try {
             long scheduleStartTime = System.currentTimeMillis();
-            int thisMasterSlot = ServerNodeManager.getSlot();
-            int masterCount = ServerNodeManager.getMasterSize();
+            int thisMasterSlot = serverNodeManager.getSlot();
+            int masterCount = serverNodeManager.getMasterSize();
             if (masterCount <= 0) {
                 logger.warn("Master count: {} is invalid, the current slot: 
{}", masterCount, thisMasterSlot);
                 return Collections.emptyList();
@@ -266,8 +269,8 @@ public class MasterSchedulerBootstrap extends 
BaseDaemonThread implements AutoCl
     }
 
     private SlotCheckState slotCheck(Command command) {
-        int slot = ServerNodeManager.getSlot();
-        int masterSize = ServerNodeManager.getMasterSize();
+        int slot = serverNodeManager.getSlot();
+        int masterSize = serverNodeManager.getMasterSize();
         SlotCheckState state;
         if (masterSize <= 0) {
             state = SlotCheckState.CHANGE;
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml 
b/dolphinscheduler-master/src/main/resources/application.yaml
index 1e5ba5d9b9..0bc55c8b6f 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -115,6 +115,7 @@ master:
     strategy: waiting
     # The max waiting time to reconnect to registry if you set the strategy to 
waiting
     max-waiting-time: 100s
+  worker-group-refresh-interval: 10s
 
 server:
   port: 5679
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
index c0c554e43b..d566f0e69a 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
@@ -22,6 +22,9 @@ import org.apache.dolphinscheduler.remote.utils.Host;
 import 
org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUtils;
 import 
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
+
+import java.util.Optional;
+
 import org.assertj.core.util.Strings;
 import org.junit.Assert;
 import org.junit.Test;
@@ -56,7 +59,8 @@ public class RoundRobinHostManagerTest {
     @Test
     public void testSelectWithResult() {
         
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22"));
-        
Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22")).thenReturn(new
 WorkerHeartBeat());
+        Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22"))
+                .thenReturn(Optional.of(new WorkerHeartBeat()));
         ExecutionContext context = 
ExecutionContextTestUtils.getExecutionContext(10000);
         Host host = roundRobinHostManager.select(context);
         Assert.assertTrue(!Strings.isNullOrEmpty(host.getAddress()));
diff --git 
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml 
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 9a7b455956..a97d03e34c 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -149,6 +149,7 @@ master:
   failover-interval: 10m
   # kill yarn jon when failover taskInstance, default true
   kill-yarn-job-when-task-failover: true
+  worker-group-refresh-interval: 10s
 
 worker:
   # worker listener port

Reply via email to