This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
     new ab8f878631 fix heartBeatTaskCount bug (#12024)
ab8f878631 is described below

commit ab8f87863127485b986c656fa1c532a43e46b852
Author: JinYong Li <[email protected]>
AuthorDate: Sun Sep 18 11:49:21 2022 +0800

    fix heartBeatTaskCount bug (#12024)
---
 .../master/registry/MasterHeartBeatTask.java       | 59 +++++-----------------
 .../master/registry/MasterRegistryClient.java      | 19 +++----
 .../worker/registry/WorkerHeartBeatTask.java       | 47 ++++-------------
 .../worker/registry/WorkerRegistryClient.java      | 21 ++++----
 4 files changed, 39 insertions(+), 107 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
similarity index 50%
copy from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index c84abb4182..2bcbad2616 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.registry;
+package org.apache.dolphinscheduler.server.master.registry;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
@@ -27,52 +28,25 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Heart beat task
+ * Master heart beat task
  */
-public class HeartBeatTask implements Runnable {
+public class MasterHeartBeatTask implements Runnable {
 
-    private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
+    private final Logger logger = 
LoggerFactory.getLogger(MasterHeartBeatTask.class);
 
     private final Set<String> heartBeatPaths;
     private final RegistryClient registryClient;
-    private int workerWaitingTaskCount;
-    private final String serverType;
     private final HeartBeat heartBeat;
-
-    private final int heartBeatErrorThreshold;
-
     private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
 
-    public HeartBeatTask(long startupTime,
-                         double maxCpuloadAvg,
-                         double reservedMemory,
-                         Set<String> heartBeatPaths,
-                         String serverType,
-                         RegistryClient registryClient,
-                         int heartBeatErrorThreshold) {
+    public MasterHeartBeatTask(long startupTime,
+                               double maxCpuloadAvg,
+                               double reservedMemory,
+                               Set<String> heartBeatPaths,
+                               RegistryClient registryClient) {
         this.heartBeatPaths = heartBeatPaths;
         this.registryClient = registryClient;
-        this.serverType = serverType;
         this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, 
reservedMemory);
-        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
-    }
-
-    public HeartBeatTask(long startupTime,
-                         double maxCpuloadAvg,
-                         double reservedMemory,
-                         int hostWeight,
-                         Set<String> heartBeatPaths,
-                         String serverType,
-                         RegistryClient registryClient,
-                         int workerThreadCount,
-                         int workerWaitingTaskCount,
-                         int heartBeatErrorThreshold) {
-        this.heartBeatPaths = heartBeatPaths;
-        this.registryClient = registryClient;
-        this.workerWaitingTaskCount = workerWaitingTaskCount;
-        this.serverType = serverType;
-        this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, 
reservedMemory, hostWeight, workerThreadCount);
-        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
     }
 
     public String getHeartBeatInfo() {
@@ -82,27 +56,18 @@ public class HeartBeatTask implements Runnable {
     @Override
     public void run() {
         try {
-            // check dead or not in zookeeper
             for (String heartBeatPath : heartBeatPaths) {
-                if (registryClient.checkIsDeadServer(heartBeatPath, 
serverType)) {
+                if (registryClient.checkIsDeadServer(heartBeatPath, 
Constants.MASTER_TYPE)) {
                     registryClient.getStoppable().stop("i was judged to death, 
release resources and stop myself");
                     return;
                 }
             }
-
-            // update waiting task count
-            heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
-
             for (String heartBeatPath : heartBeatPaths) {
                 registryClient.persistEphemeral(heartBeatPath, 
heartBeat.encodeHeartBeat());
             }
             heartBeatErrorTimes.set(0);
         } catch (Throwable ex) {
-            logger.error("HeartBeat task execute failed", ex);
-            if (heartBeatErrorTimes.incrementAndGet() >= 
heartBeatErrorThreshold) {
-                registryClient.getStoppable()
-                              .stop("HeartBeat task connect to zk failed too 
much times: " + heartBeatErrorTimes);
-            }
+            logger.error("HeartBeat task execute failed, errorTimes: {}", 
heartBeatErrorTimes.get(), ex);
         }
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index a59b712828..56167f65b4 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -30,7 +30,6 @@ import 
org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.service.FailoverService;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.lang3.StringUtils;
@@ -50,7 +49,7 @@ import com.google.common.collect.Sets;
 
 /**
  * <p>DolphinScheduler master register client, used to connect to registry and 
hand the registry events.
- * <p>When the Master node startup, it will register in registry center. And 
schedule a {@link HeartBeatTask} to update its metadata in registry.
+ * <p>When the Master node startup, it will register in registry center. And 
schedule a {@link MasterHeartBeatTask} to update its metadata in registry.
  */
 @Component
 public class MasterRegistryClient implements AutoCloseable {
@@ -97,7 +96,7 @@ public class MasterRegistryClient implements AutoCloseable {
             // master registry
             registry();
             registryClient.addConnectionStateListener(new 
MasterConnectionStateListener(getCurrentNodePath(),
-                                                                               
         registryClient));
+                registryClient));
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new 
MasterRegistryDataListener());
         } catch (Exception e) {
             throw new RegistryException("Master registry client start up 
error", e);
@@ -190,13 +189,11 @@ public class MasterRegistryClient implements 
AutoCloseable {
         logger.info("Master node : {} registering to registry center", 
masterAddress);
         String localNodePath = getCurrentNodePath();
         Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
-        HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                                                        
masterConfig.getMaxCpuLoadAvg(),
-                                                        
masterConfig.getReservedMemory(),
-                                                        
Sets.newHashSet(localNodePath),
-                                                        Constants.MASTER_TYPE,
-                                                        registryClient,
-                                                        
masterConfig.getHeartbeatErrorThreshold());
+        MasterHeartBeatTask heartBeatTask = new 
MasterHeartBeatTask(startupTime,
+            masterConfig.getMaxCpuLoadAvg(),
+            masterConfig.getReservedMemory(),
+            Sets.newHashSet(localNodePath),
+            registryClient);
 
         // remove before persist
         registryClient.remove(localNodePath);
@@ -247,4 +244,4 @@ public class MasterRegistryClient implements AutoCloseable {
         return NetUtils.getAddr(masterConfig.getListenPort());
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
similarity index 63%
rename from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
rename to 
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
index c84abb4182..72315a67b6 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
@@ -15,64 +15,43 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.registry;
+package org.apache.dolphinscheduler.server.worker.registry;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Heart beat task
  */
-public class HeartBeatTask implements Runnable {
+public class WorkerHeartBeatTask implements Runnable {
 
-    private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
+    private final Logger logger = 
LoggerFactory.getLogger(WorkerHeartBeatTask.class);
 
     private final Set<String> heartBeatPaths;
     private final RegistryClient registryClient;
     private int workerWaitingTaskCount;
-    private final String serverType;
     private final HeartBeat heartBeat;
 
-    private final int heartBeatErrorThreshold;
-
     private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
 
-    public HeartBeatTask(long startupTime,
-                         double maxCpuloadAvg,
-                         double reservedMemory,
-                         Set<String> heartBeatPaths,
-                         String serverType,
-                         RegistryClient registryClient,
-                         int heartBeatErrorThreshold) {
-        this.heartBeatPaths = heartBeatPaths;
-        this.registryClient = registryClient;
-        this.serverType = serverType;
-        this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, 
reservedMemory);
-        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
-    }
-
-    public HeartBeatTask(long startupTime,
+    public WorkerHeartBeatTask(long startupTime,
                          double maxCpuloadAvg,
                          double reservedMemory,
                          int hostWeight,
                          Set<String> heartBeatPaths,
-                         String serverType,
                          RegistryClient registryClient,
                          int workerThreadCount,
-                         int workerWaitingTaskCount,
-                         int heartBeatErrorThreshold) {
+                         int workerWaitingTaskCount) {
         this.heartBeatPaths = heartBeatPaths;
         this.registryClient = registryClient;
         this.workerWaitingTaskCount = workerWaitingTaskCount;
-        this.serverType = serverType;
         this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, 
reservedMemory, hostWeight, workerThreadCount);
-        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
     }
 
     public String getHeartBeatInfo() {
@@ -82,14 +61,12 @@ public class HeartBeatTask implements Runnable {
     @Override
     public void run() {
         try {
-            // check dead or not in zookeeper
             for (String heartBeatPath : heartBeatPaths) {
-                if (registryClient.checkIsDeadServer(heartBeatPath, 
serverType)) {
+                if (registryClient.checkIsDeadServer(heartBeatPath, 
Constants.WORKER_TYPE)) {
                     registryClient.getStoppable().stop("i was judged to death, 
release resources and stop myself");
                     return;
                 }
             }
-
             // update waiting task count
             heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
 
@@ -98,11 +75,7 @@ public class HeartBeatTask implements Runnable {
             }
             heartBeatErrorTimes.set(0);
         } catch (Throwable ex) {
-            logger.error("HeartBeat task execute failed", ex);
-            if (heartBeatErrorTimes.incrementAndGet() >= 
heartBeatErrorThreshold) {
-                registryClient.getStoppable()
-                              .stop("HeartBeat task connect to zk failed too 
much times: " + heartBeatErrorTimes);
-            }
+            logger.error("HeartBeat task execute failed, errorTimes: {}", 
heartBeatErrorTimes.get(), ex);
         }
     }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 99d18acc30..6aff4dfb19 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -101,16 +100,14 @@ public class WorkerRegistryClient implements 
AutoCloseable {
         Set<String> workerZkPaths = getWorkerZkPaths();
         long workerHeartbeatInterval = 
workerConfig.getHeartbeatInterval().getSeconds();
 
-        HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                                                        
workerConfig.getMaxCpuLoadAvg(),
-                                                        
workerConfig.getReservedMemory(),
-                                                        
workerConfig.getHostWeight(),
-                                                        workerZkPaths,
-                                                        Constants.WORKER_TYPE,
-                                                        registryClient,
-                                                        
workerConfig.getExecThreads(),
-                                                        
workerManagerThread.getThreadPoolQueueSize(),
-                                                        
workerConfig.getHeartbeatErrorThreshold());
+        WorkerHeartBeatTask heartBeatTask = new 
WorkerHeartBeatTask(startupTime,
+            workerConfig.getMaxCpuLoadAvg(),
+            workerConfig.getReservedMemory(),
+            workerConfig.getHostWeight(),
+            workerZkPaths,
+            registryClient,
+            workerConfig.getExecThreads(),
+            workerManagerThread.getThreadPoolQueueSize());
 
         for (String workerZKPath : workerZkPaths) {
             // remove before persist
@@ -199,4 +196,4 @@ public class WorkerRegistryClient implements AutoCloseable {
         unRegistry();
     }
 
-}
+}
\ No newline at end of file

Reply via email to